Commit 2e5038e7be1f1312019e3c10b0683a0c3366498a

Authored by 王明元
1 parent 5106cf1f

2023年11月27日10:16:03 多线程抓取数据加锁逻辑优化

db/freya.mv.db
No preview for this file type
db/freya.trace.db
... ... @@ -45626,3 +45626,24 @@ SELECT * FROM NS_SIG3 where SIG_MSG like "%noData%" [42122-200]
45626 45626 at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
45627 45627 at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
45628 45628 at java.base/java.lang.Thread.run(Thread.java:829)
  45629 +2023-11-27 01:30:10 jdbc[3]: exception
  45630 +java.sql.SQLClientInfoException: Client info name 'ApplicationName' not supported.
  45631 + at org.h2.jdbc.JdbcConnection.setClientInfo(JdbcConnection.java:1749)
  45632 + at com.intellij.database.remote.jdbc.impl.RemoteConnectionImpl.setClientInfo(RemoteConnectionImpl.java:466)
  45633 + at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  45634 + at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  45635 + at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  45636 + at java.base/java.lang.reflect.Method.invoke(Method.java:566)
  45637 + at java.rmi/sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:359)
  45638 + at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:200)
  45639 + at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:197)
  45640 + at java.base/java.security.AccessController.doPrivileged(Native Method)
  45641 + at java.rmi/sun.rmi.transport.Transport.serviceCall(Transport.java:196)
  45642 + at java.rmi/sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:562)
  45643 + at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:796)
  45644 + at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:677)
  45645 + at java.base/java.security.AccessController.doPrivileged(Native Method)
  45646 + at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:676)
  45647 + at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  45648 + at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  45649 + at java.base/java.lang.Thread.run(Thread.java:834)
... ...
src/main/java/cn/fw/freya/FreyaApplication.java
... ... @@ -48,7 +48,6 @@ public class FreyaApplication {
48 48 new ThreadPoolExecutor.DiscardPolicy());
49 49 }
50 50  
51   -
52 51 /**
53 52 * 初始化一个自动过期的本地缓存
54 53 */
... ...
src/main/java/cn/fw/freya/service/CrawlBizService.java
... ... @@ -137,11 +137,9 @@ public class CrawlBizService {
137 137 public void crawlData(Account account) throws IOException {
138 138 Integer type = account.getType();
139 139 String accountNo = account.getAccountNo();
140   - String key = account.getCrawlIngLockKey();
141 140 try {
142 141 AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
143 142 CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
144   - autoExpireLocalCache.put(key, key, TimeUnit.HOURS.toMillis(1));// 1小时自动过期
145 143 /* 抓取数据 */
146 144 log.info("{} 线程开始抓取{}: {} 的数据", Thread.currentThread().getName(), typeEnum.getName(), accountNo);
147 145 ReportAccountDto accountMsg = crawlStrategy.getAccountMsg(accountNo);// 更新粉丝数
... ... @@ -161,7 +159,7 @@ public class CrawlBizService {
161 159 }
162 160 /* 上报数据 */
163 161 boolean reportAccountMsg = this.doReportAccountMsg(account, accountMsg);
164   - String format = String.format("上报[%s]平台, 账户号为: %s", AccountTypeEnum.getNameByValue(type), accountNo);
  162 + String format = String.format("上报[%s]平台, 账户号为: %s", typeEnum.getName(), accountNo);
165 163 if (!reportAccountMsg) {
166 164 log.error(format + "的账户信息失败");
167 165 return;
... ... @@ -178,7 +176,7 @@ public class CrawlBizService {
178 176 }
179 177 this.afterCrawl(accountService.getById(account.getId()));
180 178 } finally {
181   - autoExpireLocalCache.removeKey(key);
  179 + autoExpireLocalCache.removeKey(account.getCrawlIngLockKey());
182 180 }
183 181 }
184 182  
... ... @@ -319,7 +317,7 @@ public class CrawlBizService {
319 317 }
320 318 boolean reportLive = this.doReportLive(account, yesterdayLiveMsg);
321 319 if (!reportLive) {
322   - accountService.setAccountUndone(accountNo);
  320 + accountService.setAccountUndone(account);
323 321 log.error(LocalDate.now() + " 上报账户为" + accountNo + "的" +
324 322 AccountTypeEnum.getNameByValue(account.getType()) + "的直播数据失败");
325 323 }
... ...
src/main/java/cn/fw/freya/service/data/AccountService.java
... ... @@ -106,9 +106,9 @@ public interface AccountService extends IService<Account> {
106 106 /**
107 107 * 将账号标记为未完成
108 108 *
109   - * @param accountNo 账号
  109 + * @param account 账号实体
110 110 */
111   - void setAccountUndone(String accountNo);
  111 + void setAccountUndone(Account account);
112 112  
113 113 /**
114 114 * 获取账号是否已推送过失效待办标识
... ...
src/main/java/cn/fw/freya/service/data/impl/AccountServiceImpl.java
... ... @@ -234,12 +234,13 @@ public class AccountServiceImpl extends ServiceImpl<AccountDao, Account> impleme
234 234 /**
235 235 * 将账号标记为未完成
236 236 *
237   - * @param accountNo 账号
  237 + * @param account 账号实体
238 238 */
239 239 @Override
240   - public void setAccountUndone(String accountNo) {
  240 + public void setAccountUndone(Account account) {
241 241 this.lambdaUpdate()
242   - .eq(Account::getAccountNo, accountNo)
  242 + .eq(Account::getAccountNo, account.getAccountNo())
  243 + .eq(Account::getType, account.getType())
243 244 .set(Account::getDone, false)
244 245 .update()
245 246 ;
... ...
src/main/java/cn/fw/freya/task/DataCaptureTask.java
... ... @@ -25,6 +25,7 @@ import org.springframework.util.CollectionUtils;
25 25 import java.io.IOException;
26 26 import java.util.*;
27 27 import java.util.concurrent.BlockingQueue;
  28 +import java.util.concurrent.RejectedExecutionException;
28 29 import java.util.concurrent.ThreadPoolExecutor;
29 30 import java.util.concurrent.TimeUnit;
30 31 import java.util.concurrent.locks.LockSupport;
... ... @@ -61,15 +62,23 @@ public class DataCaptureTask {
61 62 List<Account> waitCrawlAccount = accountService.findUnCrawlAccount();// 获取今日还未完成抓取数据的账号
62 63 if (CollectionUtils.isEmpty(waitCrawlAccount)) return;
63 64 for (Account account : waitCrawlAccount) {
64   - if (autoExpireLocalCache.containsKey(account.getCrawlIngLockKey()))
  65 + String key = account.getCrawlIngLockKey();
  66 + if (autoExpireLocalCache.containsKey(key))
65 67 continue;
66   - threadPoolExecutor.execute(() -> {
67   - try {
68   - crawlBizService.crawlData(account);
69   - } catch (IOException e) {
70   - log.error("多线程抓取数据发生异常", e);
71   - }
72   - });
  68 + try {
  69 + autoExpireLocalCache.put(key, key, TimeUnit.MINUTES.toMillis(30));// 30分钟自动过期
  70 + threadPoolExecutor.execute(() -> {
  71 + try {
  72 + crawlBizService.crawlData(account);
  73 + } catch (IOException e) {
  74 + log.error("多线程抓取数据发生异常", e);
  75 + }
  76 + });
  77 + } catch (RejectedExecutionException e) {
  78 + log.info("多线程抓取: {}({}) 数据被拒绝", account.getAccountNo(),
  79 + AccountTypeEnum.getNameByValue(account.getType()));
  80 + autoExpireLocalCache.removeKey(key);
  81 + }
73 82 }
74 83 }
75 84  
... ...
src/main/java/cn/fw/freya/utils/ThreadPoolUtil.java
... ... @@ -67,4 +67,11 @@ public class ThreadPoolUtil {
67 67 getThreadPool().shutdown();
68 68 }
69 69  
  70 + public static class UserDiscardPolicy implements RejectedExecutionHandler {
  71 + @Override
  72 + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  73 +
  74 + }
  75 + }
  76 +
70 77 }
... ...