DataCaptureTask.java
9.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package cn.fw.freya.task;
import cn.fw.freya.dao.AccountDao;
import cn.fw.freya.dao.LivePoolDao;
import cn.fw.freya.model.data.Account;
import cn.fw.freya.model.data.pool.LivePool;
import cn.fw.freya.service.CrawlBizService;
import cn.fw.freya.service.crawl.impl.Common;
import cn.fw.freya.service.crawl.impl.KuaiShouCrawl;
import cn.fw.freya.service.data.AccountService;
import cn.fw.freya.utils.DateUtil;
import cn.fw.freya.utils.PublicUtil;
import cn.fw.freya.utils.ThreadPoolUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
/**
* @author kurisu
* @date 2021-12-03 16:26
* @description 数据抓取任务
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DataCaptureTask {
private final AccountService accountService;
private final CrawlBizService crawlBizService;
private final Common common;
private final KuaiShouCrawl kuaiShouCrawl;
private final LivePoolDao livePoolDao;
private final AccountDao accountDao;
private final ApplicationEventPublisher publisher;
/**
* 每分钟执行多线程同时抓取数据
*/
//@Scheduled(cron = "0 0/1 * * * ?")
public void capture() {
final ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.getThreadPool();
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();// 获取工作队列
int activeCount = threadPoolExecutor.getActiveCount();// 获取正在执行的线程数
System.out.println("当前队列中待执行任务数:" + queue.size() + "\t 正在执行任务的线程数:" + activeCount);
if (activeCount >= threadPoolExecutor.getCorePoolSize() && queue.size() > 60) return;
List<Account> waitCrawlAccount = accountService.findAnyAccount();// 获取今日还未完成抓取数据的账号
if (CollectionUtils.isEmpty(waitCrawlAccount)) return;
// 线程池运行多个线程同时抓取多个账号数据
for (int i = 0; i < waitCrawlAccount.size(); i++) {
Account currentAccount = waitCrawlAccount.get(i);
Account nextAccount;
if (i < waitCrawlAccount.size() - 1) {
nextAccount = waitCrawlAccount.get(i + 1);
} else {
nextAccount = new Account();
}
threadPoolExecutor.execute(() -> {
try {
crawlBizService.crawlData(currentAccount);
} catch (IOException e) {
log.error(String.format("多线程抓取数据发生IOException, 异常信息为: %s", e.getMessage()));
}
});
final Integer accountType = currentAccount.getType();// 当前账户类型
final Integer nextAccountType = Optional.ofNullable(nextAccount.getType()).orElse(-1);// 下一个账户类型
final String nextAccountNo = Optional.ofNullable(nextAccount.getAccountNo()).orElse("");// 下一个账户号
final Date todayDayMinTime = DateUtil.getThisDayMinTime(new Date());
if (accountType.equals(2) && nextAccountType.equals(2)) {
if (!Objects.equals(nextAccountNo, "") && !Objects.equals(nextAccountType, -1) &&
(Objects.nonNull(common.getHasFoundAccountMsg(nextAccountNo, 2, todayDayMinTime)) &&
Objects.nonNull(common.getHasFoundVideo(nextAccountNo, 2, todayDayMinTime)) &&
Objects.nonNull(common.getHasFoundLive(nextAccountNo, 2, todayDayMinTime))
)) {
// 下一个账号为有效抖音号, 且之前已经拿到过该账号全部的数据
} else {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(20));
}
}
}
}
/**
* 每2分钟执行抓取数据
*/
//@Scheduled(fixedRate = 2 * 60 * 1000, initialDelay = 5000)
public void captureLivePlayback() {
Double durationThreshold = 60d;// 设置直播时长阈值
final Random random = new Random();
List<LivePool> withoutPlaybackLive = common.getWithoutPlaybackLive(1, durationThreshold)
.stream()
.filter(item -> 0 <= item.getGetPlaybackFailTimes() && item.getGetPlaybackFailTimes() < 30)
.collect(Collectors.toList());// 找到直播时长>60分钟, 失败次数<30的直播数据
Collection<List<LivePool>> values = withoutPlaybackLive
.stream()
.collect(Collectors.groupingBy(LivePool::getAccountNo))
.values()
.stream()
.peek(item -> item.sort(Comparator.comparing(LivePool::getGetPlaybackFailTimes)))// 把某个人每条直播按失败次数排序
.sorted(Comparator.comparing(item -> item.get(0).getGetPlaybackFailTimes()))// 失败次数最小的人排到前面
.limit(5)// 一次找5个人的直播
.collect(Collectors.toList());
for (List<LivePool> list : values) {// 遍历每个人的直播集合
final String accountNo = list.get(0).getAccountNo();
Account account = accountDao.findByAccountNoAndType(accountNo, 1);// 获取账号实体
final List<JSONObject> playbackMsg = this.getPlaybackMsg(account);// 获取该人的直播回放信息
boolean flag = false;
for (LivePool item : list) {
JSONObject obj;
if (Objects.equals(playbackMsg.size(), 1) && (obj = playbackMsg.get(0)).size() <= 1) {// 表示拿到了回放, 但未找出合适的回放
} else {
obj = kuaiShouCrawl.setPlaybackUrl(item, playbackMsg, durationThreshold);
}
String playbackUrl = obj.getString("playbackUrl");
Boolean hasInvalidPlayback = obj.getBoolean("hasInvalidPlayback");
final LivePool livePool = PublicUtil.copy(item, LivePool.class);// 对象拷贝
if (Objects.isNull(playbackUrl)) {
if (Objects.nonNull(hasInvalidPlayback) && hasInvalidPlayback)
livePool.setGetPlaybackFailTimes(-2);// 表示拿到回放数据, 但是没有和昨天直播匹配的回放
else
livePool.setGetPlaybackFailTimes(livePool.getGetPlaybackFailTimes() + 1);
} else {
flag = true;
livePool.setRoomCoverImage(obj.getString("coverUrl"));
livePool.setPlaybackUrl(playbackUrl);
}
livePoolDao.save(livePool);
}
if (flag) {
final ArrayList<LivePool> livePools = new ArrayList<>();
for (LivePool item : livePoolDao.getLiveByAccountNo(accountNo)) {// 该人有一次成功就证明已经处理过该人的所有直播了, 将他所有直播进行标记
final LivePool livePool = PublicUtil.copy(item, LivePool.class);// 对象拷贝
livePool.setGetPlaybackFailTimes(-1);
livePools.add(livePool);
}
livePoolDao.saveAll(livePools);
publisher.publishEvent(account);// 发布事件
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(12 + random.nextInt(8)));
}
}
/**
* 获取直播回放信息
*
* @param account 账号
* @return
*/
private List<JSONObject> getPlaybackMsg(Account account) {
Date previousDay = DateUtil.getPreviousDay(new Date());
Date endTime = DateUtil.getThisDayMaxTime(previousDay);
Date startTime = DateUtil.getThisDayMinTime(previousDay);
JSONArray userLivePlayback;
String playbackSearchKey = account.getPlaybackSearchKey();
if (Objects.nonNull(playbackSearchKey))
userLivePlayback = kuaiShouCrawl.getUserLivePlayback(playbackSearchKey);
else
userLivePlayback = kuaiShouCrawl.getUserLivePlayback(account.getAccountNo());
final List<JSONObject> collect = userLivePlayback
.stream()
.filter(item -> {
JSONObject obj = (JSONObject) item;
final Date createTime = obj.getDate("createTime");
return createTime.compareTo(startTime) >= 0 && createTime.compareTo(endTime) <= 0;
})
.map(item -> {
JSONObject obj = (JSONObject) item;
final Integer durationSeconds = obj.getInteger("duration");
obj.put("duration", BigDecimal.valueOf(durationSeconds).divide(BigDecimal.valueOf(60), 1, RoundingMode.HALF_UP).doubleValue());
obj.put("startTime", obj.getLong("createTime") - durationSeconds * 1000);
return obj;
})
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(collect)) {// 找到回放, 筛选结果不为空
return collect;
}
final JSONObject obj = new JSONObject();// 直接无回放数据
if (!CollectionUtils.isEmpty(userLivePlayback)) {// 找到回放, 但未筛选到适合的回放
obj.put("hasInvalidPlayback", true);
}
return Collections.singletonList(obj);
}
}