PubStandTask.java
5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package cn.fw.valhalla.controller.task;
import cn.fw.common.cache.locker.DistributedLocker;
import cn.fw.valhalla.common.utils.StringUtils;
import cn.fw.valhalla.domain.db.pub.PubCluePool;
import cn.fw.valhalla.rpc.oop.OopService;
import cn.fw.valhalla.rpc.oop.dto.GroupDTO;
import cn.fw.valhalla.service.bus.pub.PubDistributeBizService;
import cn.fw.valhalla.service.bus.pub.PubStandBizService;
import com.alibaba.fastjson.JSONObject;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.time.MonthDay;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* 公共池站岗任务
*
* @author : kurisu
* @version : 1.0
* @className : PubStandTask
* @description : 公共池站岗任务
* @date : 2023-03-15 11:01
*/
@Component
@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
@Slf4j
@RequiredArgsConstructor
public class PubStandTask {
private final PubDistributeBizService pubDistributeBizService;
private final PubStandBizService pubStandBizService;
private final StringRedisTemplate redisTemplate;
private final OopService oopService;
private final DistributedLocker distributedLocker;
private final StringRedisTemplate stringRedisTemplate;
@Value("${spring.cache.custom.global-prefix}:stand:pub")
@Getter(AccessLevel.PRIVATE)
private String keyPrefix;
@Value("${spring.cache.custom.global-prefix}:pub:distribute:fromClue")
@Getter(AccessLevel.PRIVATE)
private String fromClueCacheKey;
/**
* 重置状态
*/
@Scheduled(cron = "10 0 0 * * *")
public void resetStatus() {
pubStandBizService.reset();
}
/**
* 执行分配
*/
@Scheduled(cron = "0/8 * 8-20 * * *")
public void distributeBatch() {
List<GroupDTO> groups = oopService.allGroup();
for (GroupDTO group : groups) {
CompletableFuture.runAsync(() -> doDistribute(group));
}
}
/**
* 线索开始跟进后建立专属线索关系(如果能的话)
*/
@Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10)
public void dealFromClue() {
BoundListOperations<String, String> listOps = stringRedisTemplate.boundListOps(getFromClueCacheKey());
List<String> failList = new ArrayList<>();
String objectStr;
while ((objectStr = listOps.leftPop()) != null) {
if (!StringUtils.isEmpty(objectStr)) {
continue;
}
try {
PubCluePool cluePool = JSONObject.parseObject(objectStr, PubCluePool.class);
if (Objects.isNull(cluePool)) {
continue;
}
pubDistributeBizService.distributeFromClue(cluePool);
} catch (Exception e) {
if (StringUtils.isValid(objectStr)) {
failList.add(objectStr);
}
log.error("线索开始跟进后建立专属线索关系失败", e);
}
}
if (!CollectionUtils.isEmpty(failList)) {
String[] idArr = failList.toArray(new String[0]);
listOps.rightPushAll(idArr);
}
}
private void doDistribute(GroupDTO group) {
final String key = generateKey(group.getId());
final String lockKey = String.format("pub:distribute:%s", group.getId());
Pair<Boolean, RLock> lockPair = distributedLocker.tryLock(lockKey, TimeUnit.MINUTES, 0, -1);
try {
if (Boolean.TRUE.equals(lockPair.getLeft())) {
List<String> failList = new ArrayList<>();
BoundListOperations<String, String> ops = redisTemplate.boundListOps(key);
String id;
while ((id = ops.leftPop()) != null) {
if (StringUtils.isNumber(id)) {
try {
Boolean distributed = pubDistributeBizService.distribute(Long.parseLong(id));
if (!Boolean.TRUE.equals(distributed)) {
failList.add(id);
}
} catch (Exception e) {
if (StringUtils.isValid(id)) {
failList.add(id);
}
log.error("分配公共池客户", e);
}
}
}
if (!CollectionUtils.isEmpty(failList)) {
ops.rightPushAll(failList.toArray(new String[0]));
}
}
} finally {
if (lockPair.getRight().isLocked()) {
lockPair.getRight().unlock();
}
}
}
private String generateKey(final Long groupId) {
Assert.notNull(groupId, "groupId cannot be null");
String day = MonthDay.now().toString().replace("-", "");
return String.format("%s:%s:%s", getKeyPrefix(), day, groupId);
}
}