FollowTaskDealTask.java 6.32 KB
package cn.fw.shirasawa.server.controller.task;

import cn.fw.common.cache.locker.DistributedLocker;
import cn.fw.shirasawa.domain.db.follow.FollowTask;
import cn.fw.shirasawa.domain.db.pool.CluePool;
import cn.fw.shirasawa.domain.enums.ClueStatusEnum;
import cn.fw.shirasawa.domain.enums.TaskStateEnum;
import cn.fw.shirasawa.service.bus.follow.FollowBizService;
import cn.fw.shirasawa.service.data.CluePoolService;
import cn.fw.shirasawa.service.data.FollowTaskService;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * @author : kurisu
 * @className : FollowTask
 * @description : 跟进任务处理
 * @date: 2020-08-24 16:32
 */
@Component
@Slf4j
@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
public class FollowTaskDealTask {
    private final FollowTaskService followTaskService;
    private final FollowBizService followBizService;
    private final CluePoolService cluePoolService;
    private final static ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(10, 25, 10 * 100L,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100),
            new ThreadFactoryBuilder().setNamePrefix("close-clue-pool-").build(),
            new ThreadPoolExecutor.AbortPolicy());
    private final DistributedLocker distributedLocker;
    private final StringRedisTemplate redisTemplate;
    @Value("${spring.cache.locker.key-prefix}:termination")
    @Getter
    private String planKey;
    @Value("${spring.cache.custom.global-prefix}:clue")
    @Getter
    private String prefix;

    @Autowired
    public FollowTaskDealTask(final FollowTaskService followTaskService,
                              final FollowBizService followBizService,
                              final CluePoolService cluePoolService,
                              final DistributedLocker distributedLocker,
                              final StringRedisTemplate redisTemplate) {
        this.followTaskService = followTaskService;
        this.followBizService = followBizService;
        this.cluePoolService = cluePoolService;
        this.distributedLocker = distributedLocker;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 开始任务
     */
    @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10)
    public void startClue() {
        String key = getCacheName("start");
        Lock lock = distributedLocker.lock(key);
        boolean locked = ((RLock) lock).isLocked();
        if (!locked) {
            return;
        }
        try {
            List<CluePool> list = cluePoolService.list(Wrappers.<CluePool>lambdaQuery()
                    .eq(CluePool::getClueStatus, ClueStatusEnum.WAITING)
                    .ge(CluePool::getStartTime, LocalDate.now().atStartOfDay())
                    .le(CluePool::getStartTime, LocalDateTime.now())
                    .last("limit 0, 500")
            );
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            for (CluePool cluePool : list) {
                followBizService.startClue(cluePool);
            }
        } finally {
            distributedLocker.unlock(lock);
        }
    }

    /**
     * 逾期结束任务
     */
    @Scheduled(initialDelay = 1000 * 30, fixedRate = 1000 * 60)
    public void endTask() {
        String key = getCacheName("end");
        Lock lock = distributedLocker.lock(key);
        boolean locked = ((RLock) lock).isLocked();
        if (!locked) {
            return;
        }
        try {
            List<FollowTask> list = followTaskService.list(Wrappers.<FollowTask>lambdaQuery()
                    .eq(FollowTask::getState, TaskStateEnum.ONGOING)
                    .le(FollowTask::getDeadline, LocalDateTime.now())
            );
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            for (FollowTask r : list) {
                followBizService.endTask(r);
            }
        } finally {
            distributedLocker.unlock(lock);
        }
    }

    /**
     * 结束任务
     */
    @Scheduled(initialDelay = 1000 * 30, fixedRate = 1000 * 15)
    public void endClueTask() {
        BlockingQueue<Runnable> queue = THREAD_POOL.getQueue();
        int activeCount = THREAD_POOL.getActiveCount();
        log.debug("任务队列数:{}     线程激活数:{}", queue.size(), activeCount);
        if (!queue.isEmpty() || activeCount > 0) {
            return;
        }
        final int maxSize = 100;
        List<CluePool> addedList = new ArrayList<>(maxSize);
        List<String> failList = new ArrayList<>();
        String clueId;
        while ((clueId = redisTemplate.opsForList().leftPop(getPlanKey())) != null && addedList.size() < maxSize) {
            try {
                final CluePool cluePool = cluePoolService.getById(Long.valueOf(clueId));
                if (Objects.isNull(cluePool)) {
                    continue;
                }
                addedList.add(cluePool);
            } catch (Exception ex) {
                failList.add(clueId);
                log.error("处理线索id失败:{}", clueId, ex);
            }
        }
        if (!CollectionUtils.isEmpty(failList)) {
            redisTemplate.opsForList().rightPushAll(getPlanKey(), failList);
        }
        for (CluePool cluePool : addedList) {
            THREAD_POOL.execute(() -> followBizService.endTask(cluePool));
        }
    }

    private String getCacheName(@NonNull String name) {
        return getPrefix() + ":" + name;
    }
}