Commit 36784d5aa88961f5a1a0b9722d67325954549718

Authored by 王明元
1 parent 79be4556

2022年5月26日16:01:23 更新直播回放抓取策略

src/main/java/cn/fw/freya/controller/KSController.java
... ... @@ -90,12 +90,4 @@ public class KSController {
90 90 }).collect(Collectors.toList());
91 91 }
92 92  
93   - /**
94   - * 获取随机账户
95   - *
96   - */
97   - @GetMapping("/getRandomUser")
98   - public String getRandomUser() {
99   - return kuaishouCrawl.getRandomUserCookies();
100   - }
101 93 }
... ...
src/main/java/cn/fw/freya/controller/OtherController.java
1 1 package cn.fw.freya.controller;
2 2  
3 3 import cn.fw.freya.common.ResponseMessage;
  4 +import cn.fw.freya.model.data.Account;
  5 +import cn.fw.freya.model.data.pool.LivePool;
4 6 import cn.fw.freya.model.dto.other.ReportLiveDto;
5 7 import cn.fw.freya.model.dto.other.ReportVideoDto;
6 8 import cn.fw.freya.service.CommonBizService;
  9 +import cn.fw.freya.service.crawl.impl.Common;
7 10 import cn.fw.freya.service.rpc.AccountRpcService;
8 11 import cn.fw.freya.service.rpc.ReportRpcService;
9 12 import com.alibaba.fastjson.JSON;
... ... @@ -23,7 +26,6 @@ import java.math.BigDecimal;
23 26 import java.math.RoundingMode;
24 27 import java.text.SimpleDateFormat;
25 28 import java.time.LocalDate;
26   -import java.util.Arrays;
27 29 import java.util.Date;
28 30 import java.util.List;
29 31 import java.util.Set;
... ... @@ -39,6 +41,7 @@ import java.util.stream.Collectors;
39 41 @RequiredArgsConstructor
40 42 public class OtherController {
41 43  
  44 + private final Common common;
42 45 private final CommonBizService commonBizService;
43 46 private final AccountRpcService accountRpcService;
44 47 private final ReportRpcService reportRpcService;
... ... @@ -153,16 +156,27 @@ public class OtherController {
153 156 Set<String> fields;
154 157 }
155 158  
156   - public static void main(String[] args) {
157   - String str = "";
158   - final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
159   - List<String> fields = Arrays.asList("createTime");
160   - System.out.println(JSON.toJSONString(JSON.parseArray(str)
161   - .stream()
162   - .map(item -> {
163   - JSONObject obj = (JSONObject) item;
164   - fields.forEach(item1 -> obj.put(item1, sdf.format(new Date(obj.getLong(item1)))));
165   - return obj;
166   - }).collect(Collectors.toList())));
  159 + /**
  160 + * 获取随机账户
  161 + *
  162 + * @param type 账户类型
  163 + * @return
  164 + */
  165 + @GetMapping("/getRandomUser")
  166 + public Account getRandomUser(@NotNull(message = "账户类型不能为空") Integer type) {
  167 + return common.getRandomUserByType(type);
  168 + }
  169 +
  170 + /**
  171 + * 获取直播无回放信息的直播
  172 + *
  173 + * @param type 账户类型
  174 + * @param durationThreshold 时长阈值
  175 + * @return
  176 + */
  177 + @GetMapping("/getWithoutPlaybackLive")
  178 + public List<LivePool> getWithoutPlaybackLive(@NotNull(message = "账户类型不能为空") Integer type, @NotNull(message = "时长阈值不能为空")Double durationThreshold) {
  179 + return common.getWithoutPlaybackLive(type, durationThreshold);
167 180 }
  181 +
168 182 }
... ...
src/main/java/cn/fw/freya/dao/LivePoolDao.java
... ... @@ -24,4 +24,14 @@ public interface LivePoolDao extends JpaRepository&lt;LivePool, Long&gt; {
24 24 */
25 25 @Query("select live from LivePool live where live.phoneNo = ?1 and live.type = ?2 and live.reportDate >= ?3")
26 26 List<LivePool> getHasReportDate(String phoneNo, Integer type, Date reportDate);
  27 +
  28 + /**
  29 + * 获取没有拿到回播的直播
  30 + *
  31 + * @param type 账户类型
  32 + * @param durationThreshold 时长阈值
  33 + * @return
  34 + */
  35 + @Query("select live from LivePool live where live.playbackUrl is null and live.roomId is not null and live.type = ?1 and live.duration >= ?2")
  36 + List<LivePool> getWithoutPlaybackLive(Integer type, Double durationThreshold);
27 37 }
... ...
src/main/java/cn/fw/freya/model/data/pool/LivePool.java
... ... @@ -83,6 +83,10 @@ public class LivePool {
83 83 @Column(length = 1024)
84 84 private String playbackUrl;
85 85 /**
  86 + * 获取直播回放失败次数
  87 + */
  88 + private Integer getPlaybackFailTimes = 0;
  89 + /**
86 90 * 直播间id
87 91 */
88 92 private String roomId;
... ...
src/main/java/cn/fw/freya/service/CrawlBizService.java
... ... @@ -11,6 +11,8 @@ import cn.fw.freya.service.rpc.ReportRpcService;
11 11 import cn.fw.freya.utils.AssertUtil;
12 12 import lombok.extern.slf4j.Slf4j;
13 13 import org.springframework.beans.factory.annotation.Autowired;
  14 +import org.springframework.context.event.EventListener;
  15 +import org.springframework.scheduling.annotation.Async;
14 16 import org.springframework.stereotype.Service;
15 17  
16 18 import java.io.IOException;
... ... @@ -18,6 +20,8 @@ import java.time.LocalDate;
18 20 import java.util.List;
19 21 import java.util.Map;
20 22 import java.util.Objects;
  23 +import java.util.concurrent.TimeUnit;
  24 +import java.util.concurrent.locks.LockSupport;
21 25 import java.util.stream.Collectors;
22 26  
23 27 /**
... ... @@ -175,11 +179,7 @@ public class CrawlBizService {
175 179 if (reportFansCnt) {
176 180 return true;
177 181 } else {
178   - try {
179   - Thread.sleep(3000);
180   - } catch (InterruptedException e) {
181   - e.printStackTrace();
182   - }
  182 + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
183 183 }
184 184 hasTryTimes++;
185 185 }
... ... @@ -202,11 +202,7 @@ public class CrawlBizService {
202 202 if (reportVideo) {
203 203 return true;
204 204 } else {
205   - try {
206   - Thread.sleep(3000);
207   - } catch (InterruptedException e) {
208   - e.printStackTrace();
209   - }
  205 + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
210 206 }
211 207 hasTryTimes++;
212 208 }
... ... @@ -229,14 +225,31 @@ public class CrawlBizService {
229 225 if (reportLive) {
230 226 return true;
231 227 } else {
232   - try {
233   - Thread.sleep(3000);
234   - } catch (InterruptedException e) {
235   - e.printStackTrace();
236   - }
  228 + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
237 229 }
238 230 hasTryTimes++;
239 231 }
240 232 return false;
241 233 }
  234 +
  235 + /**
  236 + * 监听器, 收到消息后执行上报账户直播信息
  237 + *
  238 + * @param account 账户
  239 + * @throws IOException
  240 + */
  241 + @Async(value = "wmyThreadPool")
  242 + @EventListener(Account.class)
  243 + public void reportLive(Account account) throws IOException {
  244 + log.info(Thread.currentThread().getName() + " spring监听器在CrawlBizService.reportLive()方法上成功收到消息: " + account);
  245 + Integer type = account.getType();
  246 + final String accountNo = account.getPhoneNo();
  247 + AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
  248 + CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
  249 + final List<LivePool> yesterdayLiveMsg = crawlStrategy.getYesterdayLiveMsg(accountNo);// 获取昨日直播信息
  250 + final boolean reportLive = this.doReportLive(account, yesterdayLiveMsg);
  251 + if (!reportLive) {
  252 + log.error(LocalDate.now() + " 上报账户为" + accountNo + "的" + (account.getType() == 1 ? "快手" : "抖音") + "的直播数据失败");
  253 + }
  254 + }
242 255 }
... ...
src/main/java/cn/fw/freya/service/crawl/impl/Common.java
... ... @@ -5,6 +5,7 @@ import cn.fw.freya.dao.AccountDao;
5 5 import cn.fw.freya.dao.CookieDao;
6 6 import cn.fw.freya.dao.LivePoolDao;
7 7 import cn.fw.freya.dao.VideoPoolDao;
  8 +import cn.fw.freya.model.data.Account;
8 9 import cn.fw.freya.model.data.FwCookie;
9 10 import cn.fw.freya.model.data.ResponseReceived;
10 11 import cn.fw.freya.model.data.pool.LivePool;
... ... @@ -57,6 +58,7 @@ public class Common {
57 58 private final AccountDao accountDao;
58 59 private final VideoPoolDao videoPoolDao;
59 60 private final LivePoolDao livePoolDao;
  61 + private final String playbackBaseUrl = "https://live.kuaishou.com/playback/";
60 62  
61 63 /**
62 64 * 创建浏览器驱动
... ... @@ -290,4 +292,25 @@ public class Common {
290 292 return null;
291 293 }
292 294  
  295 + /**
  296 + * 根据用户类型随机获取一个用户
  297 + *
  298 + * @param type 账户类型
  299 + * @return
  300 + */
  301 + public Account getRandomUserByType(Integer type) {
  302 + return accountDao.findRandomByAndType(type).get(0);
  303 + }
  304 +
  305 + /**
  306 + * 获取直播无回放信息的直播
  307 + *
  308 + * @param type 账户类型
  309 + * @param durationThreshold 时长阈值
  310 + * @return
  311 + */
  312 + public List<LivePool> getWithoutPlaybackLive(Integer type, Double durationThreshold) {
  313 + return livePoolDao.getWithoutPlaybackLive(type, durationThreshold);
  314 + }
  315 +
293 316 }
... ...
src/main/java/cn/fw/freya/service/crawl/impl/KuaiShouCrawl.java
... ... @@ -310,16 +310,15 @@ public class KuaiShouCrawl implements CrawlStrategy {
310 310 }
311 311 JSONObject dataJSONObject = response.getJSONObject("data");
312 312 JSONArray dataJSONArray = dataJSONObject.getJSONArray("details");
313   - List<JSONObject> collect = new ArrayList<>();
  313 + /*List<JSONObject> collect = new ArrayList<>();
314 314 if (!CollectionUtils.isEmpty(dataJSONArray)) {
315   - JSONArray userLivePlayback = this.getUserLivePlayback(accountNo);
316   - if (CollectionUtils.isEmpty(userLivePlayback)) {
317   - Account account = accountDao.findByPhoneNoAndType(accountNo, this.getType().getValue());
318   - if (Objects.nonNull(account)) {
319   - userLivePlayback = this.getUserLivePlayback(account.getPlaybackSearchKey());
320   - }
321   - }
322   - assert userLivePlayback != null;
  315 + JSONArray userLivePlayback;
  316 + Account account = accountDao.findByPhoneNoAndType(accountNo, this.getType().getValue());// 获取账号实体
  317 + String playbackSearchKey = account.getPlaybackSearchKey();
  318 + if (Objects.nonNull(playbackSearchKey))
  319 + userLivePlayback = this.getUserLivePlayback(playbackSearchKey);
  320 + else
  321 + userLivePlayback = this.getUserLivePlayback(accountNo);
323 322 collect = userLivePlayback
324 323 .stream()
325 324 .filter(item -> {
... ... @@ -335,10 +334,9 @@ public class KuaiShouCrawl implements CrawlStrategy {
335 334 return obj;
336 335 })
337 336 .collect(Collectors.toList());
338   - }
  337 + }*/
339 338 List<LivePool> livePoolList = new ArrayList<>();
340 339 assert dataJSONArray != null;
341   - List<JSONObject> finalCollect = collect;
342 340 dataJSONArray.forEach(item -> {
343 341 final JSONObject obj = (JSONObject) item;
344 342 /**
... ... @@ -363,7 +361,7 @@ public class KuaiShouCrawl implements CrawlStrategy {
363 361 * userHead: "https://tx2.a.kwimgs.com/uhead/AB/2021/11/11/12/BMjAyMTExMTExMjUwMzZfMjU2MTc2NDMyMV8xX2hkMjg5XzMyOA==_s.jpg"
364 362 * -userName: "长安汽车。小明聊聊车"
365 363 */
366   - String coverUrl = obj.getString("liveCover");// 直播封面地址
  364 + /*String coverUrl = obj.getString("liveCover");// 直播封面地址
367 365 String playbackUrl = null;
368 366 final Double liveDuration = obj.getDouble("liveDuration");// 数据接口返回直播时长
369 367 final Long liveStartTimeStamp = obj.getLong("liveTime");// 数据接口返回开播时间戳
... ... @@ -402,19 +400,19 @@ public class KuaiShouCrawl implements CrawlStrategy {
402 400 playbackUrl = this.playbackBaseUrl + playbackMsg.getString("productId");
403 401 }
404 402 }
405   - }
  403 + }*/
406 404 livePoolList.add(LivePool.builder()
407 405 .type(this.getType().getValue())
408 406 .phoneNo(accountNo)
409 407 .reportDate(new Date())
410 408 .commentUserCnt(Optional.ofNullable(obj.getInteger("commentUv")).orElse(0))
411 409 .consumeUserCnt(Optional.ofNullable(obj.getInteger("sendGiftUv")).orElse(0))
412   - .duration(Optional.of(liveDuration).orElse(0d))
  410 + .duration(Optional.of(obj.getDouble("liveDuration")).orElse(0d))
413 411 .income(Optional.ofNullable(obj.getDouble("receiveAmount")).orElse(0d))
414 412 .newFansUserCnt(Optional.ofNullable(obj.getInteger("newFansCnt")).orElse(0))
415   - .openTime(new Date(Optional.ofNullable(liveStartTimeStamp).orElse(0L)))
416   - .roomCoverImage(Optional.ofNullable(coverUrl).orElse(""))
417   - .playbackUrl(playbackUrl)
  413 + .openTime(new Date(Optional.ofNullable(obj.getLong("liveTime")).orElse(0L)))
  414 + .roomCoverImage(Optional.ofNullable(obj.getString("liveCover")).orElse(""))
  415 + .playbackUrl(null)
418 416 .roomId(Optional.ofNullable(obj.getString("liveStreamId")).orElse(""))
419 417 .roomName(Optional.ofNullable(obj.getString("liveTitle")).orElse(""))
420 418 //.score(obj.getInteger("score"))
... ... @@ -449,13 +447,59 @@ public class KuaiShouCrawl implements CrawlStrategy {
449 447 return livePoolList;
450 448 }
451 449  
452   - public JSONArray getUserLivePlayback(String accountNo) {
  450 + public JSONObject setPlaybackUrl(LivePool dbLive, List<JSONObject> collect) {
  451 + JSONObject object = new JSONObject();
  452 + object.put("coverUrl", null);
  453 + object.put("playbackUrl", null);
  454 + final String accountNo = dbLive.getPhoneNo();
  455 + final Double liveDuration = dbLive.getDuration();// 数据接口返回直播时长
  456 + final Long liveStartTimeStamp = dbLive.getOpenTime().getTime();// 数据接口返回开播时间戳
  457 + if (liveDuration >= 60) {// 直播时长大于等于60分钟才记录回放信息
  458 + if (!CollectionUtils.isEmpty(collect)) {
  459 + JSONObject playbackMsg;
  460 + if (Objects.equals(collect.size(), 1)) {
  461 + playbackMsg = collect.get(0);
  462 + Double duration = playbackMsg.getDouble("duration");// 回放信息返回直播时长
  463 + final Long startTimeStamp = playbackMsg.getLong("startTime");// 回放信息返回直播开始时间戳
  464 + final double timeSubAbs = Math.abs(liveDuration - duration);
  465 + final double liveStartSub = BigDecimal.valueOf(Math.abs(liveStartTimeStamp - startTimeStamp)).divide(BigDecimal.valueOf(60 * 1000), 1, RoundingMode.HALF_UP).doubleValue();
  466 + // (Objects.equals(liveDuration, duration) || (timeSubAbs < 5))->说明时长几乎相等
  467 + // (liveStartSub < 2)->说明开播时间几乎一样
  468 + if (!((Objects.equals(liveDuration, duration) || (timeSubAbs < 5)) && liveStartSub < 10)) {
  469 + playbackMsg = null;
  470 + log.info(String.format("%s [%s]平台账户号为: %s的直播回放数据不匹配!!!", LocalDateTime.now(), this.getType().getName(), accountNo));
  471 + }
  472 + } else {
  473 + List<JSONObject> collect1 = collect.stream().filter(item1 -> {
  474 + Double duration = item1.getDouble("duration");
  475 + final Long startTimeStamp = item1.getLong("startTime");// 回放信息返回直播开始时间戳
  476 + final double timeSubAbs = Math.abs(liveDuration - duration);
  477 + final double liveStartSub = BigDecimal.valueOf(Math.abs(liveStartTimeStamp - startTimeStamp)).divide(BigDecimal.valueOf(60 * 1000), 1, RoundingMode.HALF_UP).doubleValue();
  478 + return (Objects.equals(liveDuration, duration) || timeSubAbs < 5) && liveStartSub < 10;
  479 + }).collect(Collectors.toList());
  480 + if (Objects.equals(collect1.size(), 1)) {
  481 + playbackMsg = collect1.get(0);
  482 + } else {
  483 + playbackMsg = null;
  484 + log.info(String.format("%s [%s]平台账户号为: %s的直播找到多条回放数据!!!", LocalDateTime.now(), this.getType().getName(), accountNo));
  485 + }
  486 + }
  487 + if (Objects.nonNull(playbackMsg)) {
  488 + object.put("coverUrl", playbackMsg.getString("coverUrl"));
  489 + object.put("playbackUrl", this.playbackBaseUrl + playbackMsg.getString("productId"));
  490 + }
  491 + }
  492 + }
  493 + return object;
  494 + }
  495 +
  496 + public JSONArray getUserLivePlayback(String searchKey) {
453 497 JSONArray objects = new JSONArray();
454   - if (Objects.isNull(accountNo))
  498 + if (Objects.isNull(searchKey))
455 499 return objects;
456 500 Map<String, Object> params = new HashMap<>();
457 501 Map<String, Object> params1 = new HashMap<>();
458   - params1.put("principalId", accountNo);
  502 + params1.put("principalId", searchKey);
459 503 params1.put("pcursor", "");
460 504 params1.put("count", 150);
461 505 params.put("operationName", "playbackFeedsQuery");
... ... @@ -471,13 +515,13 @@ public class KuaiShouCrawl implements CrawlStrategy {
471 515 .headers(HttpHeader
472 516 .defaultHeader()
473 517 .other("Origin", "https://live.kuaishou.com")// 非必须
474   - .referer("https://live.kuaishou.com/profile/" + accountNo)// 非必须
475   - .userAgent(this.getUserAgent(accountNo))// 非必须
  518 + .referer("https://live.kuaishou.com/profile/" + searchKey)// 非必须
  519 + .userAgent(this.getUserAgent(searchKey))// 非必须
476 520 .cookie(this.getRandomUserCookies())
477 521 .build()
478 522 );
479 523 String res = RequestUtil.post(config);
480   - log.info(String.format("%s [%s]平台账户号为: %s的回播数据的原始数据为: %s", LocalDateTime.now(), this.getType().getName(), accountNo, res));
  524 + log.info(String.format("%s [%s]平台账户号为: %s的回播数据的原始数据为: %s", LocalDateTime.now(), this.getType().getName(), searchKey, res));
481 525 if (!StringUtils.hasText(res)) {
482 526 return objects;
483 527 }
... ... @@ -485,7 +529,7 @@ public class KuaiShouCrawl implements CrawlStrategy {
485 529 try {
486 530 return resObj.getJSONObject("data").getJSONObject("playbackFeeds").getJSONArray("list");
487 531 } catch (Exception e) {
488   - log.error(String.format("%s 抓取[%s]平台账户号为: %s的直播回放数据出现异常!!! 异常信息为: %s", LocalDateTime.now(), this.getType().getName(), accountNo, e.getMessage()));
  532 + log.error(String.format("%s 抓取[%s]平台账户号为: %s的直播回放数据出现异常!!! 异常信息为: %s", LocalDateTime.now(), this.getType().getName(), searchKey, e.getMessage()));
489 533 return objects;
490 534 }
491 535 }
... ... @@ -620,8 +664,7 @@ public class KuaiShouCrawl implements CrawlStrategy {
620 664 * @return
621 665 */
622 666 public String getRandomUserCookies() {
623   - System.out.println(accountDao.findRandomByAndType(this.getType().getValue()).get(0).getPhoneNo());
624   - final List<FwCookie> cookies = common.loadCookie(accountDao.findRandomByAndType(this.getType().getValue()).get(0).getPhoneNo(), this.getType().getValue());
  667 + final List<FwCookie> cookies = common.loadCookie(common.getRandomUserByType(this.getType().getValue()).getPhoneNo(), this.getType().getValue());
625 668 StringBuffer sb = new StringBuffer();
626 669 sb.append("clientid=3;");
627 670 cookies.forEach(item -> {
... ...
src/main/java/cn/fw/freya/task/DataCaptureTask.java
1 1 package cn.fw.freya.task;
2 2  
  3 +import cn.fw.freya.dao.AccountDao;
  4 +import cn.fw.freya.dao.LivePoolDao;
3 5 import cn.fw.freya.model.data.Account;
  6 +import cn.fw.freya.model.data.pool.LivePool;
4 7 import cn.fw.freya.service.CrawlBizService;
5 8 import cn.fw.freya.service.crawl.impl.Common;
  9 +import cn.fw.freya.service.crawl.impl.KuaiShouCrawl;
6 10 import cn.fw.freya.service.data.AccountService;
7 11 import cn.fw.freya.utils.DateUtil;
8 12 import cn.fw.freya.utils.ThreadPoolUtil;
  13 +import com.alibaba.fastjson.JSONArray;
  14 +import com.alibaba.fastjson.JSONObject;
9 15 import lombok.RequiredArgsConstructor;
10 16 import lombok.extern.slf4j.Slf4j;
  17 +import org.springframework.context.ApplicationEventPublisher;
11 18 import org.springframework.scheduling.annotation.Scheduled;
12 19 import org.springframework.stereotype.Component;
13 20 import org.springframework.util.CollectionUtils;
14 21  
15 22 import java.io.IOException;
16   -import java.util.Date;
17   -import java.util.List;
18   -import java.util.Objects;
19   -import java.util.Optional;
  23 +import java.math.BigDecimal;
  24 +import java.math.RoundingMode;
  25 +import java.util.*;
20 26 import java.util.concurrent.BlockingQueue;
21 27 import java.util.concurrent.ThreadPoolExecutor;
22 28 import java.util.concurrent.TimeUnit;
23 29 import java.util.concurrent.locks.LockSupport;
  30 +import java.util.stream.Collectors;
24 31  
25 32 /**
26 33 * @author kurisu
... ... @@ -35,6 +42,10 @@ public class DataCaptureTask {
35 42 private final AccountService accountService;
36 43 private final CrawlBizService crawlBizService;
37 44 private final Common common;
  45 + private final KuaiShouCrawl kuaiShouCrawl;
  46 + private LivePoolDao livePoolDao;
  47 + private final AccountDao accountDao;
  48 + private final ApplicationEventPublisher publisher;
38 49  
39 50 /**
40 51 * 每分钟执行多线程同时抓取数据
... ... @@ -81,4 +92,97 @@ public class DataCaptureTask {
81 92 }
82 93 }
83 94 }
  95 +
  96 + /**
  97 + * 每2分钟执行抓取数据
  98 + */
  99 + @Scheduled(cron = "0 0/2 * * * ?")
  100 + public void captureLivePlayback() {
  101 + final Random random = new Random();
  102 + List<LivePool> withoutPlaybackLive = common.getWithoutPlaybackLive(1, 60d)
  103 + .stream()
  104 + .filter(item -> 0 < item.getGetPlaybackFailTimes() && item.getGetPlaybackFailTimes() < 15)
  105 + .collect(Collectors.toList());
  106 + Collection<List<LivePool>> values = withoutPlaybackLive
  107 + .stream()
  108 + .collect(Collectors.groupingBy(LivePool::getPhoneNo))
  109 + .values()
  110 + .stream()
  111 + .peek(item -> item.sort(Comparator.comparing(LivePool::getGetPlaybackFailTimes)))// 把某个人每条直播按失败次数排序
  112 + .sorted(Comparator.comparing(item -> item.get(0).getGetPlaybackFailTimes()))// 失败次数最小的人排到前面
  113 + .limit(3)// 一次找3个人的直播
  114 + .collect(Collectors.toList());
  115 + for (List<LivePool> list : values) {// 遍历每个人的直播集合
  116 + final String accountNo = list.get(0).getPhoneNo();
  117 + Account account = accountDao.findByPhoneNoAndType(accountNo, 1);// 获取账号实体
  118 + final List<JSONObject> playbackMsg = this.getPlaybackMsg(account);// 获取该人的直播回放信息
  119 + boolean flag = false;
  120 + for (LivePool item : list) {
  121 + JSONObject obj = kuaiShouCrawl.setPlaybackUrl(item, playbackMsg);
  122 + String playbackUrl = obj.getString("playbackUrl");
  123 + if (Objects.isNull(playbackUrl)) {
  124 + livePoolDao.save(LivePool.builder()
  125 + .id(item.getId())
  126 + .getPlaybackFailTimes(item.getGetPlaybackFailTimes() + 1)
  127 + .build()
  128 + );
  129 + } else {
  130 + flag = true;
  131 + livePoolDao.save(LivePool.builder()
  132 + .id(item.getId())
  133 + .roomCoverImage(obj.getString("coverUrl"))
  134 + .playbackUrl(playbackUrl)
  135 + .build()
  136 + );
  137 + }
  138 + }
  139 + if (flag) {
  140 + final ArrayList<LivePool> livePools = new ArrayList<>();
  141 + for (LivePool item : list) {// 该人有一次成功就证明已经处理过该人的所有直播了, 将他所有直播进行标记
  142 + livePools.add(LivePool.builder()
  143 + .id(item.getId())
  144 + .getPlaybackFailTimes(-1)
  145 + .build()
  146 + );
  147 + }
  148 + livePoolDao.saveAll(livePools);
  149 + publisher.publishEvent(account);// 发布事件
  150 + }
  151 + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(20) + random.nextInt(15));
  152 + }
  153 + }
  154 +
  155 + /**
  156 + * 获取直播回放信息
  157 + *
  158 + * @param account 账号
  159 + * @return
  160 + */
  161 + private List<JSONObject> getPlaybackMsg(Account account) {
  162 + Date previousDay = DateUtil.getPreviousDay(new Date());
  163 + Date endTime = DateUtil.getThisDayMaxTime(previousDay);
  164 + Date startTime = DateUtil.getThisDayMinTime(previousDay);
  165 + JSONArray userLivePlayback;
  166 + String playbackSearchKey = account.getPlaybackSearchKey();
  167 + if (Objects.nonNull(playbackSearchKey))
  168 + userLivePlayback = kuaiShouCrawl.getUserLivePlayback(playbackSearchKey);
  169 + else
  170 + userLivePlayback = kuaiShouCrawl.getUserLivePlayback(account.getPhoneNo());
  171 + return userLivePlayback
  172 + .stream()
  173 + .filter(item -> {
  174 + JSONObject obj = (JSONObject) item;
  175 + final Date createTime = obj.getDate("createTime");
  176 + return createTime.compareTo(startTime) >= 0 && createTime.compareTo(endTime) <= 0;
  177 + })
  178 + .map(item -> {
  179 + JSONObject obj = (JSONObject) item;
  180 + final Integer durationSeconds = obj.getInteger("duration");
  181 + obj.put("duration", BigDecimal.valueOf(durationSeconds).divide(BigDecimal.valueOf(60), 1, RoundingMode.HALF_UP).doubleValue());
  182 + obj.put("startTime", obj.getLong("createTime") - durationSeconds * 1000);
  183 + return obj;
  184 + })
  185 + .collect(Collectors.toList());
  186 + }
  187 +
84 188 }
... ...