DataCaptureTask.java 9.76 KB
package cn.fw.freya.task;

import cn.fw.freya.dao.AccountDao;
import cn.fw.freya.dao.LivePoolDao;
import cn.fw.freya.model.data.Account;
import cn.fw.freya.model.data.pool.LivePool;
import cn.fw.freya.service.CrawlBizService;
import cn.fw.freya.service.crawl.impl.Common;
import cn.fw.freya.service.crawl.impl.KuaiShouCrawl;
import cn.fw.freya.service.data.AccountService;
import cn.fw.freya.utils.DateUtil;
import cn.fw.freya.utils.PublicUtil;
import cn.fw.freya.utils.ThreadPoolUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;

/**
 * @author kurisu
 * @date 2021-12-03 16:26
 * @description 数据抓取任务
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class DataCaptureTask {

    private final AccountService accountService;
    private final CrawlBizService crawlBizService;
    private final Common common;
    private final KuaiShouCrawl kuaiShouCrawl;
    private final LivePoolDao livePoolDao;
    private final AccountDao accountDao;
    private final ApplicationEventPublisher publisher;

    /**
     * 每分钟执行多线程同时抓取数据
     */
    @Scheduled(cron = "0 0/1 * * * ?")
    public void capture() {
        final ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.getThreadPool();
        BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();// 获取工作队列
        int activeCount = threadPoolExecutor.getActiveCount();// 获取正在执行的线程数
        System.out.println("当前队列中待执行任务数:" + queue.size() + "\t 正在执行任务的线程数:" + activeCount);
        if (activeCount >= threadPoolExecutor.getCorePoolSize() && queue.size() > 60) return;
        List<Account> waitCrawlAccount = accountService.findAnyAccount();// 获取今日还未完成抓取数据的账号
        if (CollectionUtils.isEmpty(waitCrawlAccount)) return;
        // 线程池运行多个线程同时抓取多个账号数据
        for (int i = 0; i < waitCrawlAccount.size(); i++) {
            Account currentAccount = waitCrawlAccount.get(i);
            Account nextAccount;
            if (i < waitCrawlAccount.size() - 1) {
                nextAccount = waitCrawlAccount.get(i + 1);
            } else {
                nextAccount = new Account();
            }
            threadPoolExecutor.execute(() -> {
                try {
                    crawlBizService.crawlData(currentAccount);
                } catch (IOException e) {
                    log.error(String.format("多线程抓取数据发生IOException, 异常信息为: %s", e.getMessage()));
                }
            });
            final Integer accountType = currentAccount.getType();// 当前账户类型
            final Integer nextAccountType = Optional.ofNullable(nextAccount.getType()).orElse(-1);// 下一个账户类型
            final String nextAccountNo = Optional.ofNullable(nextAccount.getPhoneNo()).orElse("");// 下一个账户号
            final Date todayDayMinTime = DateUtil.getThisDayMinTime(new Date());
            if (accountType.equals(2) && nextAccountType.equals(2)) {
                if (!Objects.equals(nextAccountNo, "") && !Objects.equals(nextAccountType, -1) &&
                        (Objects.nonNull(common.getHasFoundFansCnt(nextAccountNo, 2, todayDayMinTime)) &&
                                Objects.nonNull(common.getHasFoundVideo(nextAccountNo, 2, todayDayMinTime)) &&
                                Objects.nonNull(common.getHasFoundLive(nextAccountNo, 2, todayDayMinTime))
                        )) {
                    // 下一个账号为有效抖音号, 且之前已经拿到过该账号全部的数据
                } else {
                    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(20));
                }
            }
        }
    }

    /**
     * 每2分钟执行抓取数据
     */
    @Scheduled(fixedRate = 2 * 60 * 1000, initialDelay = 5000)
    public void captureLivePlayback() {
        Double durationThreshold = 60d;// 设置直播时长阈值
        final Random random = new Random();
        List<LivePool> withoutPlaybackLive = common.getWithoutPlaybackLive(1, durationThreshold)
                .stream()
                .filter(item -> 0 <= item.getGetPlaybackFailTimes() && item.getGetPlaybackFailTimes() < 30)
                .collect(Collectors.toList());// 找到直播时长>60分钟, 失败次数<30的直播数据
        Collection<List<LivePool>> values = withoutPlaybackLive
                .stream()
                .collect(Collectors.groupingBy(LivePool::getPhoneNo))
                .values()
                .stream()
                .peek(item -> item.sort(Comparator.comparing(LivePool::getGetPlaybackFailTimes)))// 把某个人每条直播按失败次数排序
                .sorted(Comparator.comparing(item -> item.get(0).getGetPlaybackFailTimes()))// 失败次数最小的人排到前面
                .limit(5)// 一次找5个人的直播
                .collect(Collectors.toList());
        for (List<LivePool> list : values) {// 遍历每个人的直播集合
            final String accountNo = list.get(0).getPhoneNo();
            Account account = accountDao.findByPhoneNoAndType(accountNo, 1);// 获取账号实体
            final List<JSONObject> playbackMsg = this.getPlaybackMsg(account);// 获取该人的直播回放信息
            boolean flag = false;
            for (LivePool item : list) {
                JSONObject obj;
                if (Objects.equals(playbackMsg.size(), 1) && (obj = playbackMsg.get(0)).size() <= 1) {// 表示拿到了回放, 但未找出合适的回放

                } else {
                    obj = kuaiShouCrawl.setPlaybackUrl(item, playbackMsg, durationThreshold);
                }
                String playbackUrl = obj.getString("playbackUrl");
                Boolean hasInvalidPlayback = obj.getBoolean("hasInvalidPlayback");
                final LivePool livePool = PublicUtil.copy(item, LivePool.class);// 对象拷贝
                if (Objects.isNull(playbackUrl)) {
                    if (Objects.nonNull(hasInvalidPlayback) && hasInvalidPlayback)
                        livePool.setGetPlaybackFailTimes(-2);// 表示拿到回放数据, 但是没有和昨天直播匹配的回放
                    else
                        livePool.setGetPlaybackFailTimes(livePool.getGetPlaybackFailTimes() + 1);
                } else {
                    flag = true;
                    livePool.setRoomCoverImage(obj.getString("coverUrl"));
                    livePool.setPlaybackUrl(playbackUrl);
                }
                livePoolDao.save(livePool);
            }
            if (flag) {
                final ArrayList<LivePool> livePools = new ArrayList<>();
                for (LivePool item : livePoolDao.getLiveByAccountNo(accountNo)) {// 该人有一次成功就证明已经处理过该人的所有直播了, 将他所有直播进行标记
                    final LivePool livePool = PublicUtil.copy(item, LivePool.class);// 对象拷贝
                    livePool.setGetPlaybackFailTimes(-1);
                    livePools.add(livePool);
                }
                livePoolDao.saveAll(livePools);
                publisher.publishEvent(account);// 发布事件
            }
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(12 + random.nextInt(8)));
        }
    }

    /**
     * 获取直播回放信息
     *
     * @param account 账号
     * @return
     */
    private List<JSONObject> getPlaybackMsg(Account account) {
        Date previousDay = DateUtil.getPreviousDay(new Date());
        Date endTime = DateUtil.getThisDayMaxTime(previousDay);
        Date startTime = DateUtil.getThisDayMinTime(previousDay);
        JSONArray userLivePlayback;
        String playbackSearchKey = account.getPlaybackSearchKey();
        if (Objects.nonNull(playbackSearchKey))
            userLivePlayback = kuaiShouCrawl.getUserLivePlayback(playbackSearchKey);
        else
            userLivePlayback = kuaiShouCrawl.getUserLivePlayback(account.getPhoneNo());
        final List<JSONObject> collect = userLivePlayback
                .stream()
                .filter(item -> {
                    JSONObject obj = (JSONObject) item;
                    final Date createTime = obj.getDate("createTime");
                    return createTime.compareTo(startTime) >= 0 && createTime.compareTo(endTime) <= 0;
                })
                .map(item -> {
                    JSONObject obj = (JSONObject) item;
                    final Integer durationSeconds = obj.getInteger("duration");
                    obj.put("duration", BigDecimal.valueOf(durationSeconds).divide(BigDecimal.valueOf(60), 1, RoundingMode.HALF_UP).doubleValue());
                    obj.put("startTime", obj.getLong("createTime") - durationSeconds * 1000);
                    return obj;
                })
                .collect(Collectors.toList());
        if (!CollectionUtils.isEmpty(collect)) {// 找到回放, 筛选结果不为空
            return collect;
        }
        final JSONObject obj = new JSONObject();// 直接无回放数据
        if (!CollectionUtils.isEmpty(userLivePlayback)) {// 找到回放, 但未筛选到适合的回放
            obj.put("hasInvalidPlayback", true);
        }
        return Collections.singletonList(obj);
    }

}