Blame view

fw-valhalla-server/src/main/java/cn/fw/valhalla/controller/task/PubStandTask.java 5.66 KB
f3b188be   张志伟   :sparkles:
1
2
3
4
  package cn.fw.valhalla.controller.task;
  
  import cn.fw.common.cache.locker.DistributedLocker;
  import cn.fw.valhalla.common.utils.StringUtils;
5b538595   张志伟   feature(*): 新增查询线...
5
  import cn.fw.valhalla.common.utils.ThreadPoolUtil;
03d7ac84   张志伟   feature(*): 添加续保、...
6
  import cn.fw.valhalla.domain.db.pub.PubCluePool;
f3b188be   张志伟   :sparkles:
7
8
9
  import cn.fw.valhalla.rpc.oop.OopService;
  import cn.fw.valhalla.rpc.oop.dto.GroupDTO;
  import cn.fw.valhalla.service.bus.pub.PubDistributeBizService;
c2c96eeb   张志伟   feature(*): 公共池站岗联调
10
  import cn.fw.valhalla.service.bus.pub.PubStandBizService;
03d7ac84   张志伟   feature(*): 添加续保、...
11
  import com.alibaba.fastjson.JSONObject;
f3b188be   张志伟   :sparkles:
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  import lombok.AccessLevel;
  import lombok.Getter;
  import lombok.RequiredArgsConstructor;
  import lombok.extern.slf4j.Slf4j;
  import org.apache.commons.lang3.tuple.Pair;
  import org.redisson.api.RLock;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  import org.springframework.data.redis.core.BoundListOperations;
  import org.springframework.data.redis.core.StringRedisTemplate;
  import org.springframework.scheduling.annotation.Scheduled;
  import org.springframework.stereotype.Component;
  import org.springframework.util.Assert;
  import org.springframework.util.CollectionUtils;
  
  import java.time.MonthDay;
  import java.util.ArrayList;
  import java.util.List;
03d7ac84   张志伟   feature(*): 添加续保、...
30
  import java.util.Objects;
f3b188be   张志伟   :sparkles:
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
  import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.TimeUnit;
  
  /**
   * 公共池站岗任务
   *
   * @author : kurisu
   * @version : 1.0
   * @className : PubStandTask
   * @description : 公共池站岗任务
   * @date : 2023-03-15 11:01
   */
  @Component
  @ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
  @Slf4j
  @RequiredArgsConstructor
  public class PubStandTask {
      private final PubDistributeBizService pubDistributeBizService;
c2c96eeb   张志伟   feature(*): 公共池站岗联调
49
      private final PubStandBizService pubStandBizService;
f3b188be   张志伟   :sparkles:
50
51
52
      private final StringRedisTemplate redisTemplate;
      private final OopService oopService;
      private final DistributedLocker distributedLocker;
03d7ac84   张志伟   feature(*): 添加续保、...
53
54
      private final StringRedisTemplate stringRedisTemplate;
  
f3b188be   张志伟   :sparkles:
55
56
57
58
      @Value("${spring.cache.custom.global-prefix}:stand:pub")
      @Getter(AccessLevel.PRIVATE)
      private String keyPrefix;
  
03d7ac84   张志伟   feature(*): 添加续保、...
59
60
61
62
      @Value("${spring.cache.custom.global-prefix}:pub:distribute:fromClue")
      @Getter(AccessLevel.PRIVATE)
      private String fromClueCacheKey;
  
f3b188be   张志伟   :sparkles:
63
      /**
c2c96eeb   张志伟   feature(*): 公共池站岗联调
64
65
       * 重置状态
       */
7afb7e40   Kurisu   :ambulance:
66
      @Scheduled(cron = "10 0 0 * * *")
c2c96eeb   张志伟   feature(*): 公共池站岗联调
67
68
69
70
71
      public void resetStatus() {
          pubStandBizService.reset();
      }
  
      /**
f3b188be   张志伟   :sparkles:
72
73
       * 执行分配
       */
4fd39c38   张志伟   feature(*): 优化查询活动资格
74
      @Scheduled(cron = "0/20 * 8-20 * * *")
f3b188be   张志伟   :sparkles:
75
76
77
      public void distributeBatch() {
          List<GroupDTO> groups = oopService.allGroup();
          for (GroupDTO group : groups) {
5b538595   张志伟   feature(*): 新增查询线...
78
              CompletableFuture.runAsync(() -> doDistribute(group), ThreadPoolUtil.getInstance().getExecutor());
f3b188be   张志伟   :sparkles:
79
80
81
          }
      }
  
03d7ac84   张志伟   feature(*): 添加续保、...
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
      /**
       * 线索开始跟进后建立专属线索关系(如果能的话)
       */
      @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10)
      public void dealFromClue() {
          BoundListOperations<String, String> listOps = stringRedisTemplate.boundListOps(getFromClueCacheKey());
          List<String> failList = new ArrayList<>();
          String objectStr;
          while ((objectStr = listOps.leftPop()) != null) {
              if (!StringUtils.isEmpty(objectStr)) {
                  continue;
              }
              try {
                  PubCluePool cluePool = JSONObject.parseObject(objectStr, PubCluePool.class);
                  if (Objects.isNull(cluePool)) {
                      continue;
                  }
                  pubDistributeBizService.distributeFromClue(cluePool);
              } catch (Exception e) {
                  if (StringUtils.isValid(objectStr)) {
                      failList.add(objectStr);
                  }
                  log.error("线索开始跟进后建立专属线索关系失败", e);
              }
          }
          if (!CollectionUtils.isEmpty(failList)) {
              String[] idArr = failList.toArray(new String[0]);
              listOps.rightPushAll(idArr);
          }
      }
  
5b538595   张志伟   feature(*): 新增查询线...
113
      public void doDistribute(GroupDTO group) {
f3b188be   张志伟   :sparkles:
114
115
116
117
118
119
120
121
122
123
124
          final String key = generateKey(group.getId());
          final String lockKey = String.format("pub:distribute:%s", group.getId());
          Pair<Boolean, RLock> lockPair = distributedLocker.tryLock(lockKey, TimeUnit.MINUTES, 0, -1);
          try {
              if (Boolean.TRUE.equals(lockPair.getLeft())) {
                  List<String> failList = new ArrayList<>();
                  BoundListOperations<String, String> ops = redisTemplate.boundListOps(key);
                  String id;
                  while ((id = ops.leftPop()) != null) {
                      if (StringUtils.isNumber(id)) {
                          try {
364db3cc   张志伟   :sparkles:
125
                              Boolean distributed = pubDistributeBizService.distribute(Long.parseLong(id));
c09a6a49   张志伟   :sparkles:
126
                              if (!Boolean.TRUE.equals(distributed)) {
f3b188be   张志伟   :sparkles:
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
                                  failList.add(id);
                              }
                          } catch (Exception e) {
                              if (StringUtils.isValid(id)) {
                                  failList.add(id);
                              }
                              log.error("分配公共池客户", e);
                          }
                      }
                  }
                  if (!CollectionUtils.isEmpty(failList)) {
                      ops.rightPushAll(failList.toArray(new String[0]));
                  }
              }
          } finally {
              if (lockPair.getRight().isLocked()) {
                  lockPair.getRight().unlock();
              }
          }
      }
  
      private String generateKey(final Long groupId) {
          Assert.notNull(groupId, "groupId cannot be null");
          String day = MonthDay.now().toString().replace("-", "");
          return String.format("%s:%s:%s", getKeyPrefix(), day, groupId);
      }
  }