PubStandTask.java 5.66 KB
package cn.fw.valhalla.controller.task;

import cn.fw.common.cache.locker.DistributedLocker;
import cn.fw.valhalla.common.utils.StringUtils;
import cn.fw.valhalla.common.utils.ThreadPoolUtil;
import cn.fw.valhalla.domain.db.pub.PubCluePool;
import cn.fw.valhalla.rpc.oop.OopService;
import cn.fw.valhalla.rpc.oop.dto.GroupDTO;
import cn.fw.valhalla.service.bus.pub.PubDistributeBizService;
import cn.fw.valhalla.service.bus.pub.PubStandBizService;
import com.alibaba.fastjson.JSONObject;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.time.MonthDay;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * 公共池站岗任务
 *
 * @author : kurisu
 * @version : 1.0
 * @className : PubStandTask
 * @description : 公共池站岗任务
 * @date : 2023-03-15 11:01
 */
@Component
@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
@Slf4j
@RequiredArgsConstructor
public class PubStandTask {
    private final PubDistributeBizService pubDistributeBizService;
    private final PubStandBizService pubStandBizService;
    private final StringRedisTemplate redisTemplate;
    private final OopService oopService;
    private final DistributedLocker distributedLocker;
    private final StringRedisTemplate stringRedisTemplate;

    @Value("${spring.cache.custom.global-prefix}:stand:pub")
    @Getter(AccessLevel.PRIVATE)
    private String keyPrefix;

    @Value("${spring.cache.custom.global-prefix}:pub:distribute:fromClue")
    @Getter(AccessLevel.PRIVATE)
    private String fromClueCacheKey;

    /**
     * 重置状态
     */
    @Scheduled(cron = "10 0 0 * * *")
    public void resetStatus() {
        pubStandBizService.reset();
    }

    /**
     * 执行分配
     */
    @Scheduled(cron = "0/8 * 8-20 * * *")
    public void distributeBatch() {
        List<GroupDTO> groups = oopService.allGroup();
        for (GroupDTO group : groups) {
            CompletableFuture.runAsync(() -> doDistribute(group), ThreadPoolUtil.getInstance().getExecutor());
        }
    }

    /**
     * 线索开始跟进后建立专属线索关系(如果能的话)
     */
    @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10)
    public void dealFromClue() {
        BoundListOperations<String, String> listOps = stringRedisTemplate.boundListOps(getFromClueCacheKey());
        List<String> failList = new ArrayList<>();
        String objectStr;
        while ((objectStr = listOps.leftPop()) != null) {
            if (!StringUtils.isEmpty(objectStr)) {
                continue;
            }
            try {
                PubCluePool cluePool = JSONObject.parseObject(objectStr, PubCluePool.class);
                if (Objects.isNull(cluePool)) {
                    continue;
                }
                pubDistributeBizService.distributeFromClue(cluePool);
            } catch (Exception e) {
                if (StringUtils.isValid(objectStr)) {
                    failList.add(objectStr);
                }
                log.error("线索开始跟进后建立专属线索关系失败", e);
            }
        }
        if (!CollectionUtils.isEmpty(failList)) {
            String[] idArr = failList.toArray(new String[0]);
            listOps.rightPushAll(idArr);
        }
    }

    public void doDistribute(GroupDTO group) {
        final String key = generateKey(group.getId());
        final String lockKey = String.format("pub:distribute:%s", group.getId());
        Pair<Boolean, RLock> lockPair = distributedLocker.tryLock(lockKey, TimeUnit.MINUTES, 0, -1);
        try {
            if (Boolean.TRUE.equals(lockPair.getLeft())) {
                List<String> failList = new ArrayList<>();
                BoundListOperations<String, String> ops = redisTemplate.boundListOps(key);
                String id;
                while ((id = ops.leftPop()) != null) {
                    if (StringUtils.isNumber(id)) {
                        try {
                            Boolean distributed = pubDistributeBizService.distribute(Long.parseLong(id));
                            if (!Boolean.TRUE.equals(distributed)) {
                                failList.add(id);
                            }
                        } catch (Exception e) {
                            if (StringUtils.isValid(id)) {
                                failList.add(id);
                            }
                            log.error("分配公共池客户", e);
                        }
                    }
                }
                if (!CollectionUtils.isEmpty(failList)) {
                    ops.rightPushAll(failList.toArray(new String[0]));
                }
            }
        } finally {
            if (lockPair.getRight().isLocked()) {
                lockPair.getRight().unlock();
            }
        }
    }

    private String generateKey(final Long groupId) {
        Assert.notNull(groupId, "groupId cannot be null");
        String day = MonthDay.now().toString().replace("-", "");
        return String.format("%s:%s:%s", getKeyPrefix(), day, groupId);
    }
}