CrawlBizService.java 9.4 KB
package cn.fw.freya.service;

import cn.fw.freya.enums.AccountTypeEnum;
import cn.fw.freya.model.data.Account;
import cn.fw.freya.model.data.pool.LivePool;
import cn.fw.freya.model.data.pool.VideoPool;
import cn.fw.freya.service.crawl.CrawlStrategy;
import cn.fw.freya.service.data.AccountService;
import cn.fw.freya.service.rpc.AccountRpcService;
import cn.fw.freya.service.rpc.ReportRpcService;
import cn.fw.freya.utils.AssertUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;

/**
 * @author kurisu
 * @date 2021-11-15 14:35
 * @description bizService
 */
@Slf4j
@Service
public class CrawlBizService {

    private final Map<AccountTypeEnum, CrawlStrategy> crawlStrategyMap;
    private final AccountService accountService;
    private final AccountRpcService accountRpcService;
    private final ReportRpcService reportRpcService;

    @Autowired
    public CrawlBizService(final List<CrawlStrategy> crawlStrategyList,
                           final AccountService accountService,
                           final AccountRpcService accountRpcService,
                           final ReportRpcService reportRpcService) {
        this.crawlStrategyMap = crawlStrategyList.stream()
                .collect(Collectors.toMap(CrawlStrategy::getType, v -> v));
        this.accountService = accountService;
        this.accountRpcService = accountRpcService;
        this.reportRpcService = reportRpcService;
    }

    /**
     * 登陆准备
     *
     * @param phoneNo 账户号
     * @param type    账户类型(1:快手, 2:抖音, 3:懂车帝, 4:Bilibili)
     * @return
     */
    public String preLogin(String phoneNo, Integer type) {
        AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
        AssertUtil.notNull(typeEnum, () -> "平台类型不正确");
        CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
        return crawlStrategy.preLogin(phoneNo);
    }

    /**
     * 登陆
     *
     * @param phoneNo 账户号
     * @param type    账户类型(1:快手, 2:抖音, 3:懂车帝, 4:Bilibili)
     * @return
     */
    public boolean doLogin(String phoneNo, Integer type) {
        AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
        AssertUtil.notNull(typeEnum, () -> "平台类型不正确");
        CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
        return crawlStrategy.doLogin(phoneNo);
    }

    /**
     * 退出浏览器
     *
     * @param phoneNo 账户号
     * @param type    账户类型(1:快手, 2:抖音, 3:懂车帝, 4:Bilibili)
     * @return
     */
    public boolean exitBrowser(String phoneNo, Integer type) {
        AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
        AssertUtil.notNull(typeEnum, () -> "平台类型不正确");
        CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
        return crawlStrategy.exitBrowser(phoneNo);
    }

    /**
     * 执行抓取数据
     *
     * @param account
     */
    public void crawlData(Account account) throws IOException {
        Integer type = account.getType();
        final String accountNo = account.getPhoneNo();
        AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
        CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
        // 抓取数据
        log.info("线程: " + Thread.currentThread().getName() + " 开始抓取数据");
        final Integer fansCnt = crawlStrategy.updateAccountFans(accountNo);// 更新粉丝数
        if (Objects.isNull(fansCnt)) {
            doPushExpireAccount(account);
            return;
        }
        final List<VideoPool> allVideoMsg = crawlStrategy.getAllVideoMsg(accountNo);// 获取所有视频信息
        if (Objects.isNull(allVideoMsg)) {
            doPushExpireAccount(account);
            return;
        }
        final List<LivePool> yesterdayLiveMsg = crawlStrategy.getYesterdayLiveMsg(accountNo);// 获取昨日直播信息
        if (Objects.isNull(yesterdayLiveMsg)) {
            doPushExpireAccount(account);
            return;
        }
        // 上报数据
        final boolean reportFansCnt = this.doReportFansCnt(account, fansCnt);
        if (!reportFansCnt) {
            log.error(LocalDate.now() + " 上报账户为" + accountNo + "的" + (account.getType() == 1 ? "快手" : "抖音") + "的粉丝数据失败");
            return;
        }
        final boolean reportVideo = this.doReportVideo(account, allVideoMsg);
        if (!reportVideo) {
            log.error(LocalDate.now() + " 上报账户为" + accountNo + "的" + (account.getType() == 1 ? "快手" : "抖音") + "的视频数据失败");
            return;
        }
        final boolean reportLive = this.doReportLive(account, yesterdayLiveMsg);
        if (!reportLive) {
            log.error(LocalDate.now() + " 上报账户为" + accountNo + "的" + (account.getType() == 1 ? "快手" : "抖音") + "的直播数据失败");
            return;
        }
        this.afterCrawl(accountService.findById(account.getId()));
    }

    /**
     * 后续处理, 设置账户今日完成数据抓取
     *
     * @param account
     */
    private void afterCrawl(Account account) {
        account.setDone(Boolean.TRUE);
        accountService.saveOrUpdate(account);
    }

    /**
     * 上报失效账户
     *
     * @param account 账户
     */
    private void doPushExpireAccount(Account account) {
        boolean result = false;
        int hasTryTimes = 0;
        int maxTryTimes = 2;
        while (!result && hasTryTimes < maxTryTimes) {
            result = accountRpcService.pushExpireAccount(account.getPhoneNo(), account.getType());
            hasTryTimes++;
        }
    }

    /**
     * 上报粉丝数
     *
     * @param account 账户
     * @param fansCnt 粉丝数
     * @return
     */
    private boolean doReportFansCnt(Account account, Integer fansCnt) {
        boolean reportFansCnt;
        int hasTryTimes = 0;
        int maxTryTimes = 2;
        while (hasTryTimes < maxTryTimes) {
            reportFansCnt = reportRpcService.reportFansCnt(account.getPhoneNo(), account.getType(), fansCnt);
            if (reportFansCnt) {
                return true;
            } else {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
            }
            hasTryTimes++;
        }
        return false;
    }

    /**
     * 上报所有视频
     *
     * @param account     账户
     * @param allVideoMsg 所有视频
     * @return
     */
    private boolean doReportVideo(Account account, List<VideoPool> allVideoMsg) {
        boolean reportVideo;
        int hasTryTimes = 0;
        int maxTryTimes = 2;
        while (hasTryTimes < maxTryTimes) {
            reportVideo = reportRpcService.reportVideo(account.getPhoneNo(), account.getType(), allVideoMsg);
            if (reportVideo) {
                return true;
            } else {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
            }
            hasTryTimes++;
        }
        return false;
    }

    /**
     * 上报昨日所有直播
     *
     * @param account          账户
     * @param yesterdayLiveMsg 昨日所有直播数据
     * @return
     */
    private boolean doReportLive(Account account, List<LivePool> yesterdayLiveMsg) {
        boolean reportLive;
        int hasTryTimes = 0;
        int maxTryTimes = 2;
        while (hasTryTimes < maxTryTimes) {
            reportLive = reportRpcService.reportLive(account.getPhoneNo(), account.getType(), yesterdayLiveMsg);
            if (reportLive) {
                return true;
            } else {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
            }
            hasTryTimes++;
        }
        return false;
    }

    /**
     * 监听器, 收到消息后执行上报账户直播信息
     *
     * @param account 账户
     * @throws IOException
     */
    @Async(value = "wmyThreadPool")
    @EventListener(Account.class)
    public void reportLive(Account account) throws IOException {
        log.info(Thread.currentThread().getName() + " spring监听器在CrawlBizService.reportLive()方法上成功收到消息: " + account);
        Integer type = account.getType();
        final String accountNo = account.getPhoneNo();
        AccountTypeEnum typeEnum = AccountTypeEnum.getEnumByValue(type);
        CrawlStrategy crawlStrategy = crawlStrategyMap.get(typeEnum);
        final List<LivePool> yesterdayLiveMsg = crawlStrategy.getYesterdayLiveMsg(accountNo);// 获取昨日直播信息
        final boolean reportLive = this.doReportLive(account, yesterdayLiveMsg);
        if (!reportLive) {
            accountService.setAccountUndone(accountNo);
            log.error(LocalDate.now() + " 上报账户为" + accountNo + "的" + (account.getType() == 1 ? "快手" : "抖音") + "的直播数据失败");
        }
    }
}