Commit 247270675861ef5f44e743bb547b4b28c7d07e55

Authored by 王明元
1 parent 2a4fe65f

2023年7月13日16:30:30 抓取数据开启线程细节优化

src/main/java/cn/fw/freya/controller/OtherController.java
... ... @@ -9,6 +9,7 @@ import cn.fw.freya.service.CommonBizService;
9 9 import cn.fw.freya.service.crawl.impl.Common;
10 10 import cn.fw.freya.service.rpc.AccountRpcService;
11 11 import cn.fw.freya.service.rpc.ReportRpcService;
  12 +import cn.fw.freya.utils.AutoExpireLocalCache;
12 13 import com.alibaba.fastjson.JSON;
13 14 import com.alibaba.fastjson.JSONObject;
14 15 import lombok.Data;
... ... @@ -26,6 +27,7 @@ import java.math.BigDecimal;
26 27 import java.math.RoundingMode;
27 28 import java.text.SimpleDateFormat;
28 29 import java.time.LocalDate;
  30 +import java.util.ArrayList;
29 31 import java.util.Date;
30 32 import java.util.List;
31 33 import java.util.Set;
... ... @@ -46,6 +48,7 @@ public class OtherController {
46 48 private final AccountRpcService accountRpcService;
47 49 private final ReportRpcService reportRpcService;
48 50 private final ApplicationEventPublisher publisher;
  51 + private final AutoExpireLocalCache autoExpireLocalCache;
49 52  
50 53 /**
51 54 * 清理 driverMap
... ... @@ -172,4 +175,19 @@ public class OtherController {
172 175 return common.getWithoutPlaybackLive(type, durationThreshold);
173 176 }
174 177  
  178 + /**
  179 + * 获取本地缓存中所有键值对
  180 + *
  181 + * @return 本地缓存中所有键值对
  182 + */
  183 + @GetMapping("/getCaches")
  184 + public ResponseMessage<List<String>> get() {
  185 + List<String> result = new ArrayList<>();
  186 + autoExpireLocalCache.getCacheMap().forEach((k, v) -> result.add(k + ":" + v));
  187 + return ResponseMessage.success(result.stream()
  188 + .sorted()
  189 + .collect(Collectors.toList())
  190 + );
  191 + }
  192 +
175 193 }
... ...
src/main/java/cn/fw/freya/service/CrawlBizService.java
... ... @@ -16,7 +16,6 @@ import org.springframework.beans.factory.annotation.Autowired;
16 16 import org.springframework.context.event.EventListener;
17 17 import org.springframework.scheduling.annotation.Async;
18 18 import org.springframework.stereotype.Service;
19   -import org.springframework.util.StringUtils;
20 19  
21 20 import java.io.IOException;
22 21 import java.time.LocalDate;
... ... @@ -110,8 +109,6 @@ public class CrawlBizService {
110 109 String key = accountNo + "->" + type;
111 110 AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
112 111 CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
113   - if (StringUtils.hasText(autoExpireLocalCache.get(key)))
114   - return;
115 112 autoExpireLocalCache.put(key, key, 3600 * 1000L);// 1小时自动过期
116 113 // 抓取数据
117 114 log.info("线程: " + Thread.currentThread().getName() + " 开始抓取数据");
... ...
src/main/java/cn/fw/freya/task/DataCaptureTask.java
... ... @@ -20,7 +20,6 @@ import org.springframework.context.ApplicationEventPublisher;
20 20 import org.springframework.scheduling.annotation.Scheduled;
21 21 import org.springframework.stereotype.Component;
22 22 import org.springframework.util.CollectionUtils;
23   -import org.springframework.util.StringUtils;
24 23  
25 24 import java.io.IOException;
26 25 import java.util.*;
... ... @@ -59,6 +58,26 @@ public class DataCaptureTask {
59 58 log.info("当前队列中待执行任务数:" + queue.size() + "\t 正在执行任务的线程数:" + activeCount);
60 59 List<Account> waitCrawlAccount = accountService.findUnCrawlAccount();// 获取今日还未完成抓取数据的账号
61 60 if (CollectionUtils.isEmpty(waitCrawlAccount)) return;
  61 + for (Account account : waitCrawlAccount) {
  62 + String key = account.getAccountNo() + "->" + account.getType();
  63 + if (autoExpireLocalCache.containsKey(key))
  64 + continue;
  65 + threadPoolExecutor.execute(() -> {
  66 + try {
  67 + crawlBizService.crawlData(account);
  68 + } catch (IOException e) {
  69 + log.error("多线程抓取数据发生异常", e);
  70 + }
  71 + });
  72 + }
  73 + }
  74 + /*public void capture() {
  75 + ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.getThreadPool();
  76 + BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();// 获取工作队列
  77 + int activeCount = threadPoolExecutor.getActiveCount();// 获取正在执行的线程数
  78 + log.info("当前队列中待执行任务数:" + queue.size() + "\t 正在执行任务的线程数:" + activeCount);
  79 + List<Account> waitCrawlAccount = accountService.findUnCrawlAccount();// 获取今日还未完成抓取数据的账号
  80 + if (CollectionUtils.isEmpty(waitCrawlAccount)) return;
62 81 // 线程池运行多个线程同时抓取多个账号数据
63 82 for (int i = 0; i < waitCrawlAccount.size(); i++) {
64 83 Account currentAccount = waitCrawlAccount.get(i);
... ... @@ -94,7 +113,7 @@ public class DataCaptureTask {
94 113 }
95 114 }
96 115 }
97   - }
  116 + }*/
98 117  
99 118 /**
100 119 * 每2分钟执行抓取数据
... ...
src/main/java/cn/fw/freya/utils/AutoExpireLocalCache.java
... ... @@ -119,6 +119,16 @@ public class AutoExpireLocalCache {
119 119 }
120 120  
121 121 /**
  122 + * 缓存中是否包含指定key
  123 + *
  124 + * @param key 指定key
  125 + * @return 缓存中是否包含指定key
  126 + */
  127 + public boolean containsKey(String key) {
  128 + return cacheMap.containsKey(key);
  129 + }
  130 +
  131 + /**
122 132 * 如果key存在于队列中, 则移除该实体(用作锁续命, 重新指定过期时间)
123 133 *
124 134 * @param key 待缓存的key
... ...