f3b188be
张志伟
|
1
2
3
4
|
package cn.fw.valhalla.controller.task;
import cn.fw.common.cache.locker.DistributedLocker;
import cn.fw.valhalla.common.utils.StringUtils;
|
5b538595
张志伟
feature(*): 新增查询线...
|
5
|
import cn.fw.valhalla.common.utils.ThreadPoolUtil;
|
03d7ac84
张志伟
feature(*): 添加续保、...
|
6
|
import cn.fw.valhalla.domain.db.pub.PubCluePool;
|
f3b188be
张志伟
|
7
8
9
|
import cn.fw.valhalla.rpc.oop.OopService;
import cn.fw.valhalla.rpc.oop.dto.GroupDTO;
import cn.fw.valhalla.service.bus.pub.PubDistributeBizService;
|
c2c96eeb
张志伟
feature(*): 公共池站岗联调
|
10
|
import cn.fw.valhalla.service.bus.pub.PubStandBizService;
|
03d7ac84
张志伟
feature(*): 添加续保、...
|
11
|
import com.alibaba.fastjson.JSONObject;
|
f3b188be
张志伟
|
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.time.MonthDay;
import java.util.ArrayList;
import java.util.List;
|
03d7ac84
张志伟
feature(*): 添加续保、...
|
30
|
import java.util.Objects;
|
f3b188be
张志伟
|
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* 公共池站岗任务
*
* @author : kurisu
* @version : 1.0
* @className : PubStandTask
* @description : 公共池站岗任务
* @date : 2023-03-15 11:01
*/
@Component
@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
@Slf4j
@RequiredArgsConstructor
public class PubStandTask {
private final PubDistributeBizService pubDistributeBizService;
|
c2c96eeb
张志伟
feature(*): 公共池站岗联调
|
49
|
private final PubStandBizService pubStandBizService;
|
f3b188be
张志伟
|
50
51
52
|
private final StringRedisTemplate redisTemplate;
private final OopService oopService;
private final DistributedLocker distributedLocker;
|
03d7ac84
张志伟
feature(*): 添加续保、...
|
53
54
|
private final StringRedisTemplate stringRedisTemplate;
|
f3b188be
张志伟
|
55
56
57
58
|
@Value("${spring.cache.custom.global-prefix}:stand:pub")
@Getter(AccessLevel.PRIVATE)
private String keyPrefix;
|
03d7ac84
张志伟
feature(*): 添加续保、...
|
59
60
61
62
|
@Value("${spring.cache.custom.global-prefix}:pub:distribute:fromClue")
@Getter(AccessLevel.PRIVATE)
private String fromClueCacheKey;
|
f3b188be
张志伟
|
63
|
/**
|
c2c96eeb
张志伟
feature(*): 公共池站岗联调
|
64
65
|
* 重置状态
*/
|
7afb7e40
Kurisu
|
66
|
@Scheduled(cron = "10 0 0 * * *")
|
c2c96eeb
张志伟
feature(*): 公共池站岗联调
|
67
68
69
70
71
|
public void resetStatus() {
pubStandBizService.reset();
}
/**
|
f3b188be
张志伟
|
72
73
|
* 执行分配
*/
|
7afb7e40
Kurisu
|
74
|
@Scheduled(cron = "0/8 * 8-20 * * *")
|
f3b188be
张志伟
|
75
76
77
|
public void distributeBatch() {
List<GroupDTO> groups = oopService.allGroup();
for (GroupDTO group : groups) {
|
5b538595
张志伟
feature(*): 新增查询线...
|
78
|
CompletableFuture.runAsync(() -> doDistribute(group), ThreadPoolUtil.getInstance().getExecutor());
|
f3b188be
张志伟
|
79
80
81
|
}
}
|
03d7ac84
张志伟
feature(*): 添加续保、...
|
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
/**
* 线索开始跟进后建立专属线索关系(如果能的话)
*/
@Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10)
public void dealFromClue() {
BoundListOperations<String, String> listOps = stringRedisTemplate.boundListOps(getFromClueCacheKey());
List<String> failList = new ArrayList<>();
String objectStr;
while ((objectStr = listOps.leftPop()) != null) {
if (!StringUtils.isEmpty(objectStr)) {
continue;
}
try {
PubCluePool cluePool = JSONObject.parseObject(objectStr, PubCluePool.class);
if (Objects.isNull(cluePool)) {
continue;
}
pubDistributeBizService.distributeFromClue(cluePool);
} catch (Exception e) {
if (StringUtils.isValid(objectStr)) {
failList.add(objectStr);
}
log.error("线索开始跟进后建立专属线索关系失败", e);
}
}
if (!CollectionUtils.isEmpty(failList)) {
String[] idArr = failList.toArray(new String[0]);
listOps.rightPushAll(idArr);
}
}
|
5b538595
张志伟
feature(*): 新增查询线...
|
113
|
public void doDistribute(GroupDTO group) {
|
f3b188be
张志伟
|
114
115
116
117
118
119
120
121
122
123
124
|
final String key = generateKey(group.getId());
final String lockKey = String.format("pub:distribute:%s", group.getId());
Pair<Boolean, RLock> lockPair = distributedLocker.tryLock(lockKey, TimeUnit.MINUTES, 0, -1);
try {
if (Boolean.TRUE.equals(lockPair.getLeft())) {
List<String> failList = new ArrayList<>();
BoundListOperations<String, String> ops = redisTemplate.boundListOps(key);
String id;
while ((id = ops.leftPop()) != null) {
if (StringUtils.isNumber(id)) {
try {
|
364db3cc
张志伟
|
125
|
Boolean distributed = pubDistributeBizService.distribute(Long.parseLong(id));
|
c09a6a49
张志伟
|
126
|
if (!Boolean.TRUE.equals(distributed)) {
|
f3b188be
张志伟
|
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
failList.add(id);
}
} catch (Exception e) {
if (StringUtils.isValid(id)) {
failList.add(id);
}
log.error("分配公共池客户", e);
}
}
}
if (!CollectionUtils.isEmpty(failList)) {
ops.rightPushAll(failList.toArray(new String[0]));
}
}
} finally {
if (lockPair.getRight().isLocked()) {
lockPair.getRight().unlock();
}
}
}
private String generateKey(final Long groupId) {
Assert.notNull(groupId, "groupId cannot be null");
String day = MonthDay.now().toString().replace("-", "");
return String.format("%s:%s:%s", getKeyPrefix(), day, groupId);
}
}
|