DataCaptureTask.java 8.77 KB
package cn.fw.freya.task;

import cn.fw.freya.dao.AccountDao;
import cn.fw.freya.dao.LivePoolDao;
import cn.fw.freya.model.data.Account;
import cn.fw.freya.model.data.pool.LivePool;
import cn.fw.freya.service.CrawlBizService;
import cn.fw.freya.service.crawl.impl.Common;
import cn.fw.freya.service.crawl.impl.KuaiShouCrawl;
import cn.fw.freya.service.data.AccountService;
import cn.fw.freya.utils.DateUtil;
import cn.fw.freya.utils.ThreadPoolUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;

/**
 * @author kurisu
 * @date 2021-12-03 16:26
 * @description 数据抓取任务
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class DataCaptureTask {

    private final AccountService accountService;
    private final CrawlBizService crawlBizService;
    private final Common common;
    private final KuaiShouCrawl kuaiShouCrawl;
    private LivePoolDao livePoolDao;
    private final AccountDao accountDao;
    private final ApplicationEventPublisher publisher;

    /**
     * 每分钟执行多线程同时抓取数据
     */
    @Scheduled(cron = "0 0/1 * * * ?")
    public void capture() {
        final ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.getThreadPool();
        BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();// 获取工作队列
        int activeCount = threadPoolExecutor.getActiveCount();// 获取正在执行的线程数
        System.out.println("当前队列中待执行任务数:" + queue.size() + "\t 正在执行任务的线程数:" + activeCount);
        if (activeCount >= threadPoolExecutor.getCorePoolSize() && queue.size() > 60) return;
        List<Account> waitCrawlAccount = accountService.findAnyAccount();// 获取今日还未完成抓取数据的账号
        if (CollectionUtils.isEmpty(waitCrawlAccount)) return;
        // 线程池运行多个线程同时抓取多个账号数据
        for (int i = 0; i < waitCrawlAccount.size(); i++) {
            Account currentAccount = waitCrawlAccount.get(i);
            Account nextAccount;
            if (i < waitCrawlAccount.size() - 1) {
                nextAccount = waitCrawlAccount.get(i + 1);
            } else {
                nextAccount = new Account();
            }
            threadPoolExecutor.execute(() -> {
                try {
                    crawlBizService.crawlData(currentAccount);
                } catch (IOException e) {
                    log.error(String.format("多线程抓取数据发生IOException, 异常信息为: %s", e.getMessage()));
                }
            });
            final Integer accountType = currentAccount.getType();// 当前账户类型
            final Integer nextAccountType = Optional.ofNullable(nextAccount.getType()).orElse(-1);// 下一个账户类型
            final String nextAccountNo = Optional.ofNullable(nextAccount.getPhoneNo()).orElse("");// 下一个账户号
            final Date todayDayMinTime = DateUtil.getThisDayMinTime(new Date());
            if (accountType.equals(2) && nextAccountType.equals(2)) {
                if (!Objects.equals(nextAccountNo, "") && !Objects.equals(nextAccountType, -1) &&
                        (Objects.nonNull(common.getHasFoundFansCnt(nextAccountNo, 2, todayDayMinTime)) &&
                                Objects.nonNull(common.getHasFoundVideo(nextAccountNo, 2, todayDayMinTime)) &&
                                Objects.nonNull(common.getHasFoundLive(nextAccountNo, 2, todayDayMinTime))
                        )) {
                    // 下一个账号为有效抖音号, 且之前已经拿到过该账号全部的数据
                } else {
                    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(20));
                }
            }
        }
    }

    /**
     * 每2分钟执行抓取数据
     */
    @Scheduled(cron = "0 0/2 * * * ?")
    public void captureLivePlayback() {
        final Random random = new Random();
        List<LivePool> withoutPlaybackLive = common.getWithoutPlaybackLive(1, 60d)
                .stream()
                .filter(item -> 0 <= item.getGetPlaybackFailTimes() && item.getGetPlaybackFailTimes() < 30)
                .collect(Collectors.toList());// 找到直播时长>60分钟, 失败次数<30的直播数据
        Collection<List<LivePool>> values = withoutPlaybackLive
                .stream()
                .collect(Collectors.groupingBy(LivePool::getPhoneNo))
                .values()
                .stream()
                .peek(item -> item.sort(Comparator.comparing(LivePool::getGetPlaybackFailTimes)))// 把某个人每条直播按失败次数排序
                .sorted(Comparator.comparing(item -> item.get(0).getGetPlaybackFailTimes()))// 失败次数最小的人排到前面
                .limit(3)// 一次找3个人的直播
                .collect(Collectors.toList());
        for (List<LivePool> list : values) {// 遍历每个人的直播集合
            final String accountNo = list.get(0).getPhoneNo();
            Account account = accountDao.findByPhoneNoAndType(accountNo, 1);// 获取账号实体
            final List<JSONObject> playbackMsg = this.getPlaybackMsg(account);// 获取该人的直播回放信息
            boolean flag = false;
            for (LivePool item : list) {
                JSONObject obj = kuaiShouCrawl.setPlaybackUrl(item, playbackMsg);
                String playbackUrl = obj.getString("playbackUrl");
                if (Objects.isNull(playbackUrl)) {
                    livePoolDao.save(LivePool.builder()
                            .id(item.getId())
                            .getPlaybackFailTimes(item.getGetPlaybackFailTimes() + 1)
                            .build()
                    );
                } else {
                    flag = true;
                    livePoolDao.save(LivePool.builder()
                            .id(item.getId())
                            .roomCoverImage(obj.getString("coverUrl"))
                            .playbackUrl(playbackUrl)
                            .build()
                    );
                }
            }
            if (flag) {
                final ArrayList<LivePool> livePools = new ArrayList<>();
                for (LivePool item : list) {// 该人有一次成功就证明已经处理过该人的所有直播了, 将他所有直播进行标记
                    livePools.add(LivePool.builder()
                            .id(item.getId())
                            .getPlaybackFailTimes(-1)
                            .build()
                    );
                }
                livePoolDao.saveAll(livePools);
                publisher.publishEvent(account);// 发布事件
            }
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(20) + random.nextInt(15));
        }
    }

    /**
     * 获取直播回放信息
     *
     * @param account 账号
     * @return
     */
    private List<JSONObject> getPlaybackMsg(Account account) {
        Date previousDay = DateUtil.getPreviousDay(new Date());
        Date endTime = DateUtil.getThisDayMaxTime(previousDay);
        Date startTime = DateUtil.getThisDayMinTime(previousDay);
        JSONArray userLivePlayback;
        String playbackSearchKey = account.getPlaybackSearchKey();
        if (Objects.nonNull(playbackSearchKey))
            userLivePlayback = kuaiShouCrawl.getUserLivePlayback(playbackSearchKey);
        else
            userLivePlayback = kuaiShouCrawl.getUserLivePlayback(account.getPhoneNo());
        return userLivePlayback
                .stream()
                .filter(item -> {
                    JSONObject obj = (JSONObject) item;
                    final Date createTime = obj.getDate("createTime");
                    return createTime.compareTo(startTime) >= 0 && createTime.compareTo(endTime) <= 0;
                })
                .map(item -> {
                    JSONObject obj = (JSONObject) item;
                    final Integer durationSeconds = obj.getInteger("duration");
                    obj.put("duration", BigDecimal.valueOf(durationSeconds).divide(BigDecimal.valueOf(60), 1, RoundingMode.HALF_UP).doubleValue());
                    obj.put("startTime", obj.getLong("createTime") - durationSeconds * 1000);
                    return obj;
                })
                .collect(Collectors.toList());
    }

}