Commit 2a4fe65fca101aa93d02d0b43dde7c08246d9883

Authored by 王明元
1 parent 96443f8d

2023年7月13日15:10:51 抓取数据开启线程细节优化

src/main/java/cn/fw/freya/FreyaApplication.java
1 1 package cn.fw.freya;
2 2  
  3 +import cn.fw.freya.utils.AutoExpireLocalCache;
3 4 import cn.hutool.core.thread.ThreadFactoryBuilder;
4 5 import org.mybatis.spring.annotation.MapperScan;
5 6 import org.springframework.boot.SpringApplication;
... ... @@ -46,4 +47,13 @@ public class FreyaApplication {
46 47 new ThreadFactoryBuilder().setNamePrefix("wmy-thread-pool-").build(),
47 48 new ThreadPoolExecutor.DiscardPolicy());
48 49 }
  50 +
  51 +
  52 + /**
  53 + * 初始化一个自动过期的本地缓存
  54 + */
  55 + @Bean
  56 + public AutoExpireLocalCache initAutoExpireLocalCache() {
  57 + return new AutoExpireLocalCache(1000);
  58 + }
49 59 }
... ...
src/main/java/cn/fw/freya/service/CrawlBizService.java
... ... @@ -10,11 +10,13 @@ import cn.fw.freya.service.data.AccountService;
10 10 import cn.fw.freya.service.rpc.AccountRpcService;
11 11 import cn.fw.freya.service.rpc.ReportRpcService;
12 12 import cn.fw.freya.utils.AssertUtil;
  13 +import cn.fw.freya.utils.AutoExpireLocalCache;
13 14 import lombok.extern.slf4j.Slf4j;
14 15 import org.springframework.beans.factory.annotation.Autowired;
15 16 import org.springframework.context.event.EventListener;
16 17 import org.springframework.scheduling.annotation.Async;
17 18 import org.springframework.stereotype.Service;
  19 +import org.springframework.util.StringUtils;
18 20  
19 21 import java.io.IOException;
20 22 import java.time.LocalDate;
... ... @@ -39,17 +41,20 @@ public class CrawlBizService {
39 41 private final AccountService accountService;
40 42 private final AccountRpcService accountRpcService;
41 43 private final ReportRpcService reportRpcService;
  44 + private final AutoExpireLocalCache autoExpireLocalCache;
42 45  
43 46 @Autowired
44 47 public CrawlBizService(final List<CrawlStrategy> crawlStrategyList,
45 48 final AccountService accountService,
46 49 final AccountRpcService accountRpcService,
47   - final ReportRpcService reportRpcService) {
  50 + final ReportRpcService reportRpcService,
  51 + AutoExpireLocalCache autoExpireLocalCache) {
48 52 this.crawlStrategyMap = crawlStrategyList.stream()
49 53 .collect(Collectors.toMap(CrawlStrategy::getType, Function.identity()));
50 54 this.accountService = accountService;
51 55 this.accountRpcService = accountRpcService;
52 56 this.reportRpcService = reportRpcService;
  57 + this.autoExpireLocalCache = autoExpireLocalCache;
53 58 }
54 59  
55 60 /**
... ... @@ -102,23 +107,30 @@ public class CrawlBizService {
102 107 public void crawlData(Account account) throws IOException {
103 108 Integer type = account.getType();
104 109 String accountNo = account.getAccountNo();
  110 + String key = accountNo + "->" + type;
105 111 AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
106 112 CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
  113 + if (StringUtils.hasText(autoExpireLocalCache.get(key)))
  114 + return;
  115 + autoExpireLocalCache.put(key, key, 3600 * 1000L);// 1小时自动过期
107 116 // 抓取数据
108 117 log.info("线程: " + Thread.currentThread().getName() + " 开始抓取数据");
109 118 ReportAccountDto accountMsg = crawlStrategy.updateAccountMsg(accountNo);// 更新粉丝数
110 119 if (Objects.isNull(accountMsg)) {
111 120 this.doPushExpireAccount(account);
  121 + autoExpireLocalCache.removeKey(key);
112 122 return;
113 123 }
114 124 List<VideoPool> allVideoMsg = crawlStrategy.getAllVideoMsg(accountNo);// 获取所有视频信息
115 125 if (Objects.isNull(allVideoMsg)) {
116 126 this.doPushExpireAccount(account);
  127 + autoExpireLocalCache.removeKey(key);
117 128 return;
118 129 }
119 130 List<LivePool> yesterdayLiveMsg = crawlStrategy.getYesterdayLiveMsg(accountNo);// 获取昨日直播信息
120 131 if (Objects.isNull(yesterdayLiveMsg)) {
121 132 this.doPushExpireAccount(account);
  133 + autoExpireLocalCache.removeKey(key);
122 134 return;
123 135 }
124 136 // 上报数据
... ... @@ -126,18 +138,22 @@ public class CrawlBizService {
126 138 String format = String.format("上报[%s]平台, 账户号为: %s", AccountTypeEnum.getNameByValue(account.getType()), accountNo);
127 139 if (!reportAccountMsg) {
128 140 log.error(format + "的账户信息失败");
  141 + autoExpireLocalCache.removeKey(key);
129 142 return;
130 143 }
131 144 boolean reportVideo = this.doReportVideo(account, allVideoMsg);
132 145 if (!reportVideo) {
133 146 log.error(format + "的视频数据失败");
  147 + autoExpireLocalCache.removeKey(key);
134 148 return;
135 149 }
136 150 boolean reportLive = this.doReportLive(account, yesterdayLiveMsg);
137 151 if (!reportLive) {
138 152 log.error(format + "的直播数据失败");
  153 + autoExpireLocalCache.removeKey(key);
139 154 return;
140 155 }
  156 + autoExpireLocalCache.removeKey(key);
141 157 this.afterCrawl(accountService.getById(account.getId()));
142 158 }
143 159  
... ...
src/main/java/cn/fw/freya/task/DataCaptureTask.java
1 1 package cn.fw.freya.task;
2 2  
3   -import cn.fw.freya.dao.AccountDao;
4 3 import cn.fw.freya.dao.LivePoolDao;
5   -import cn.fw.freya.enums.AccountTypeEnum;
6 4 import cn.fw.freya.domain.data.Account;
7 5 import cn.fw.freya.domain.data.pool.LivePool;
  6 +import cn.fw.freya.enums.AccountTypeEnum;
8 7 import cn.fw.freya.service.CrawlBizService;
9 8 import cn.fw.freya.service.crawl.impl.Common;
10 9 import cn.fw.freya.service.crawl.impl.KuaiShouCrawl;
11 10 import cn.fw.freya.service.data.AccountService;
  11 +import cn.fw.freya.utils.AutoExpireLocalCache;
12 12 import cn.fw.freya.utils.DateUtil;
13 13 import cn.fw.freya.utils.PublicUtil;
14 14 import cn.fw.freya.utils.ThreadPoolUtil;
... ... @@ -20,6 +20,7 @@ import org.springframework.context.ApplicationEventPublisher;
20 20 import org.springframework.scheduling.annotation.Scheduled;
21 21 import org.springframework.stereotype.Component;
22 22 import org.springframework.util.CollectionUtils;
  23 +import org.springframework.util.StringUtils;
23 24  
24 25 import java.io.IOException;
25 26 import java.util.*;
... ... @@ -44,8 +45,8 @@ public class DataCaptureTask {
44 45 private final Common common;
45 46 private final KuaiShouCrawl kuaiShouCrawl;
46 47 private final LivePoolDao livePoolDao;
47   - private final AccountDao accountDao;
48 48 private final ApplicationEventPublisher publisher;
  49 + private final AutoExpireLocalCache autoExpireLocalCache;
49 50  
50 51 /**
51 52 * 每分钟执行多线程同时抓取数据
... ... @@ -56,12 +57,14 @@ public class DataCaptureTask {
56 57 BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();// 获取工作队列
57 58 int activeCount = threadPoolExecutor.getActiveCount();// 获取正在执行的线程数
58 59 log.info("当前队列中待执行任务数:" + queue.size() + "\t 正在执行任务的线程数:" + activeCount);
59   - if (activeCount >= threadPoolExecutor.getCorePoolSize() && queue.size() > 60) return;
60 60 List<Account> waitCrawlAccount = accountService.findUnCrawlAccount();// 获取今日还未完成抓取数据的账号
61 61 if (CollectionUtils.isEmpty(waitCrawlAccount)) return;
62 62 // 线程池运行多个线程同时抓取多个账号数据
63 63 for (int i = 0; i < waitCrawlAccount.size(); i++) {
64 64 Account currentAccount = waitCrawlAccount.get(i);
  65 + String key = currentAccount.getAccountNo() + "->" + currentAccount.getType();
  66 + if (StringUtils.hasText(autoExpireLocalCache.get(key)))
  67 + continue;
65 68 Account nextAccount;
66 69 if (i < waitCrawlAccount.size() - 1) {
67 70 nextAccount = waitCrawlAccount.get(i + 1);
... ...
src/main/java/cn/fw/freya/utils/AutoExpireLocalCache.java 0 → 100644
  1 +package cn.fw.freya.utils;
  2 +
  3 +import java.util.concurrent.*;
  4 +import java.util.concurrent.locks.LockSupport;
  5 +import java.util.logging.Logger;
  6 +
  7 +/**
  8 + * @author wmy3969
  9 + * @version 1.0
  10 + * @date 2022/8/24 16:06
  11 + * @Description 带自动过期的本地缓存
  12 + */
  13 +public class AutoExpireLocalCache {
  14 + private final DelayQueue<Entity> delayQueue = new DelayQueue<>();// 延迟队列
  15 + private final ConcurrentHashMap<String, String> cacheMap = new ConcurrentHashMap<>();// 缓存map
  16 + public static final Logger logger = Logger.getLogger(AutoExpireLocalCache.class.getName());// 日志
  17 +
  18 + /**
  19 + * AutoExpireLocalCache对象构造方法, 开启一个线程, 默认100ms一个周期, 监听过期key
  20 + */
  21 + public AutoExpireLocalCache() {
  22 + this(100);
  23 + }
  24 +
  25 + /**
  26 + * AutoExpireLocalCache对象构造方法, 开启一个线程, 指定一个周期(不能小于10ms), 监听过期key
  27 + *
  28 + * @param scanCycle 监听过期key, 扫描周期
  29 + */
  30 + public AutoExpireLocalCache(long scanCycle) {
  31 + long cycle = scanCycle <= 10 ? 10 : scanCycle;
  32 + new Thread(() -> {
  33 + while (true) {
  34 + while (!delayQueue.isEmpty()) {
  35 + try {
  36 + Entity entity = delayQueue.take();
  37 + final String waitRemoveKey = entity.getKey();
  38 + logger.info("已移除key为: " + waitRemoveKey + ", 队列长度为: " + delayQueue.size() + ", 缓存map元素个数为: " + (cacheMap.size() - 1));
  39 + cacheMap.remove(waitRemoveKey);
  40 + } catch (InterruptedException e) {
  41 + logger.warning("error: " + e.getMessage());
  42 + Thread.currentThread().interrupt();
  43 + }
  44 + }
  45 + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(cycle));
  46 + }
  47 + }).start();
  48 + }
  49 +
  50 + /**
  51 + * 缓存map放数据, 默认30s过期
  52 + *
  53 + * @param key 待缓存key
  54 + * @param value 待缓存value
  55 + */
  56 + public void put(String key, String value) {
  57 + this.removeKeyIfAbsent(key);
  58 + delayQueue.put(new Entity(key));
  59 + cacheMap.put(key, value);
  60 + }
  61 +
  62 + /**
  63 + * 缓存map放数据
  64 + *
  65 + * @param key 待缓存key
  66 + * @param value 待缓存value
  67 + * @param saveMillis 保存时间(ms)
  68 + */
  69 + public void put(String key, String value, long saveMillis) {
  70 + this.removeKeyIfAbsent(key);
  71 + delayQueue.put(new Entity(key, saveMillis));
  72 + cacheMap.put(key, value);
  73 + }
  74 +
  75 + /**
  76 + * 根据key获取value
  77 + *
  78 + * @param key 缓存key
  79 + * @return 缓存value
  80 + */
  81 + public String get(String key) {
  82 + return cacheMap.get(key);
  83 + }
  84 +
  85 + /**
  86 + * 获取整个缓存map
  87 + *
  88 + * @return 整个缓存map
  89 + */
  90 + public ConcurrentMap<String, String> getCacheMap() {
  91 + return cacheMap;
  92 + }
  93 +
  94 + /**
  95 + * 移除缓存key相关内容
  96 + *
  97 + * @param key 缓存key
  98 + */
  99 + public void removeKey(String key) {
  100 + if (this.removeKeyIfAbsent(key))
  101 + cacheMap.remove(key);
  102 + }
  103 +
  104 + /**
  105 + * 获取AutoExpireLocalCache对象中队列长度和map元素个数
  106 + *
  107 + * @return 队列长度和map元素个数
  108 + */
  109 + public String getSizeMsg() {
  110 + return "队列长度为: " + delayQueue.size() + ", 缓存map元素个数为: " + cacheMap.size();
  111 + }
  112 +
  113 + /**
  114 + * 销毁AutoExpireLocalCache对象中队列和缓存map
  115 + */
  116 + public void clear() {
  117 + delayQueue.clear();
  118 + cacheMap.clear();
  119 + }
  120 +
  121 + /**
  122 + * 如果key存在于队列中, 则移除该实体(用作锁续命, 重新指定过期时间)
  123 + *
  124 + * @param key 待缓存的key
  125 + */
  126 + private boolean removeKeyIfAbsent(String key) {
  127 + Entity entity = null;
  128 + if (cacheMap.containsKey(key)) {
  129 + for (Entity e : delayQueue) {
  130 + if (e.getKey().equals(key)) {
  131 + entity = e;
  132 + break;
  133 + }
  134 + }
  135 + if (entity != null)
  136 + delayQueue.remove(entity);
  137 + return true;
  138 + }
  139 + return false;
  140 + }
  141 +
  142 + static class Entity implements Delayed {
  143 + private final String key;// key
  144 + private final long createMillis;// 创建时间戳
  145 + private final long delayMillis;// 延迟时间(ms)
  146 + private final long expireMillis;// 到期时间戳
  147 +
  148 + public String getKey() {
  149 + return key;
  150 + }
  151 +
  152 + /**
  153 + * 不指定key延期时间(默认30s过期)
  154 + *
  155 + * @param key 缓存key
  156 + */
  157 + public Entity(String key) {
  158 + this(key, 30000);
  159 + }
  160 +
  161 + /**
  162 + * 指定key延期时间
  163 + *
  164 + * @param key 缓存key
  165 + * @param delayMillis 延期时间(ms)
  166 + */
  167 + public Entity(String key, long delayMillis) {
  168 + this.key = key;
  169 + this.createMillis = System.currentTimeMillis();
  170 + this.delayMillis = delayMillis;
  171 + this.expireMillis = this.createMillis + delayMillis;
  172 + }
  173 +
  174 + public long getDelay(TimeUnit unit) {
  175 + return unit.convert(this.expireMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  176 + }
  177 +
  178 + public int compareTo(Delayed delayed) {
  179 + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - delayed.getDelay(TimeUnit.MILLISECONDS));
  180 + }
  181 +
  182 + @Override
  183 + public String toString() {
  184 + return "Entity{" +
  185 + "key='" + key + '\'' +
  186 + ", createMillis=" + createMillis +
  187 + ", delayMillis=" + delayMillis +
  188 + ", expireMillis=" + expireMillis +
  189 + '}';
  190 + }
  191 + }
  192 +}
... ...