Commit f3b188be72dedfdb77c968753ce218ab3c17c30e

Authored by 张志伟
1 parent d8d78651

:sparkles: feature(*): 项目启动时同步队列

- 项目启动时同步队列
fw-valhalla-rpc/src/main/java/cn/fw/valhalla/rpc/oop/OopService.java
... ... @@ -15,6 +15,7 @@ import lombok.extern.slf4j.Slf4j;
15 15 import org.apache.commons.collections4.CollectionUtils;
16 16 import org.springframework.beans.BeanUtils;
17 17 import org.springframework.beans.factory.annotation.Value;
  18 +import org.springframework.cache.annotation.Cacheable;
18 19 import org.springframework.stereotype.Service;
19 20 import org.springframework.util.Assert;
20 21  
... ... @@ -225,6 +226,32 @@ public class OopService extends AbsBaseRpcService {
225 226 }
226 227 }
227 228  
  229 + /**
  230 + * 查询所有集团
  231 + *
  232 + * @return
  233 + */
  234 + @Cacheable(cacheNames = "group:all:info", unless = "#result.size() <= 0")
  235 + public List<GroupDTO> allGroup() {
  236 + try {
  237 + Message<List<GroupInfoVo>> msg = groupApiService.getGroupInfoList();
  238 + List<GroupInfoVo> data = msg.getData();
  239 + if (msg.isSuccess() && data != null) {
  240 + List<GroupDTO> list = data.stream().map(item -> {
  241 + final GroupDTO groupDTO = new GroupDTO();
  242 + BeanUtils.copyProperties(item, groupDTO);
  243 + return groupDTO;
  244 + }).collect(Collectors.toList());
  245 + if (CollectionUtils.isNotEmpty(list)) {
  246 + return list;
  247 + }
  248 + }
  249 + } catch (Exception e) {
  250 + e.printStackTrace();
  251 + }
  252 + return new ArrayList<>();
  253 + }
  254 +
228 255 public List<DealerDTO> dealers(final Long groupId) {
229 256 if (groupId == null) {
230 257 return null;
... ...
fw-valhalla-rpc/src/main/java/cn/fw/valhalla/rpc/shirasawa/ShirasawaRpcService.java
... ... @@ -17,6 +17,7 @@ import org.springframework.beans.BeanUtils;
17 17 import org.springframework.stereotype.Service;
18 18  
19 19 import java.util.Objects;
  20 +import java.util.Optional;
20 21  
21 22 import static cn.fw.common.businessvalidator.Validator.BV;
22 23  
... ... @@ -107,4 +108,27 @@ public class ShirasawaRpcService {
107 108 final Message<Boolean> msg = followApiService.createNextRecord(newRecordDTO);
108 109 BV.isTrue(msg.isSuccess(), msg::getResult);
109 110 }
  111 +
  112 + /**
  113 + * 查询是否有未完成的待办
  114 + *
  115 + * @param userId
  116 + * @return
  117 + */
  118 + public boolean hasOngoingFollow(Long userId) {
  119 + try {
  120 + if (Objects.isNull(userId)) {
  121 + return true;
  122 + }
  123 + Message<Long> message = followApiService.queryRecordRemaining(userId, BusinessTypeEnum.AS.getValue());
  124 + log.info("followApiService.queryRecordRemaining: msg.code={}, msg.result={}", message.getCode(), message.getResult());
  125 + if (message.isSuccess()) {
  126 + long data = Optional.ofNullable(message.getData()).orElse(0L);
  127 + return data > 0;
  128 + }
  129 + } catch (Exception e) {
  130 + log.error("查询未完成的待办失败。userId: [{}]", userId, e);
  131 + }
  132 + return true;
  133 + }
110 134 }
... ...
fw-valhalla-server/src/main/java/cn/fw/valhalla/controller/runner/InitCommandLineRunner.java 0 → 100644
  1 +package cn.fw.valhalla.controller.runner;
  2 +
  3 +import cn.fw.valhalla.service.bus.pub.PubStandBizService;
  4 +import lombok.RequiredArgsConstructor;
  5 +import lombok.extern.slf4j.Slf4j;
  6 +import org.springframework.boot.CommandLineRunner;
  7 +import org.springframework.stereotype.Component;
  8 +
  9 +/**
  10 + * 初始化同步逻辑
  11 + *
  12 + * @author : kurisu
  13 + * @version : 1.0
  14 + * @className : InitCommandLineRunner
  15 + * @description : 初始化同步逻辑
  16 + * @date : 2022-05-09 15:22
  17 + */
  18 +@Component
  19 +@Slf4j
  20 +@RequiredArgsConstructor
  21 +public class InitCommandLineRunner implements CommandLineRunner {
  22 + private final PubStandBizService pubStandBizService;
  23 +
  24 + @Override
  25 + public void run(String... args) {
  26 + log.info("项目启动成功,开始同步公共池站岗队列");
  27 + pubStandBizService.syncQueue();
  28 + }
  29 +}
... ...
fw-valhalla-server/src/main/java/cn/fw/valhalla/controller/task/PubStandTask.java 0 → 100644
  1 +package cn.fw.valhalla.controller.task;
  2 +
  3 +import cn.fw.common.cache.locker.DistributedLocker;
  4 +import cn.fw.valhalla.common.utils.StringUtils;
  5 +import cn.fw.valhalla.rpc.oop.OopService;
  6 +import cn.fw.valhalla.rpc.oop.dto.GroupDTO;
  7 +import cn.fw.valhalla.service.bus.pub.PubDistributeBizService;
  8 +import lombok.AccessLevel;
  9 +import lombok.Getter;
  10 +import lombok.RequiredArgsConstructor;
  11 +import lombok.extern.slf4j.Slf4j;
  12 +import org.apache.commons.lang3.tuple.Pair;
  13 +import org.redisson.api.RLock;
  14 +import org.springframework.beans.factory.annotation.Value;
  15 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  16 +import org.springframework.data.redis.core.BoundListOperations;
  17 +import org.springframework.data.redis.core.StringRedisTemplate;
  18 +import org.springframework.scheduling.annotation.Scheduled;
  19 +import org.springframework.stereotype.Component;
  20 +import org.springframework.util.Assert;
  21 +import org.springframework.util.CollectionUtils;
  22 +
  23 +import java.time.MonthDay;
  24 +import java.util.ArrayList;
  25 +import java.util.List;
  26 +import java.util.concurrent.CompletableFuture;
  27 +import java.util.concurrent.TimeUnit;
  28 +
  29 +/**
  30 + * 公共池站岗任务
  31 + *
  32 + * @author : kurisu
  33 + * @version : 1.0
  34 + * @className : PubStandTask
  35 + * @description : 公共池站岗任务
  36 + * @date : 2023-03-15 11:01
  37 + */
  38 +@Component
  39 +@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
  40 +@Slf4j
  41 +@RequiredArgsConstructor
  42 +public class PubStandTask {
  43 + private final PubDistributeBizService pubDistributeBizService;
  44 + private final StringRedisTemplate redisTemplate;
  45 + private final OopService oopService;
  46 + private final DistributedLocker distributedLocker;
  47 + @Value("${spring.cache.custom.global-prefix}:stand:pub")
  48 + @Getter(AccessLevel.PRIVATE)
  49 + private String keyPrefix;
  50 +
  51 + /**
  52 + * 执行分配
  53 + */
  54 + @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 5)
  55 + public void distributeBatch() {
  56 + List<GroupDTO> groups = oopService.allGroup();
  57 + for (GroupDTO group : groups) {
  58 + CompletableFuture.runAsync(() -> doDistribute(group));
  59 + }
  60 + }
  61 +
  62 + private void doDistribute(GroupDTO group) {
  63 + final String key = generateKey(group.getId());
  64 + final String lockKey = String.format("pub:distribute:%s", group.getId());
  65 + Pair<Boolean, RLock> lockPair = distributedLocker.tryLock(lockKey, TimeUnit.MINUTES, 0, -1);
  66 + try {
  67 + if (Boolean.TRUE.equals(lockPair.getLeft())) {
  68 + List<String> failList = new ArrayList<>();
  69 + BoundListOperations<String, String> ops = redisTemplate.boundListOps(key);
  70 + String id;
  71 + while ((id = ops.leftPop()) != null) {
  72 + if (StringUtils.isNumber(id)) {
  73 + try {
  74 + boolean distributed = pubDistributeBizService.distribute(Long.valueOf(id));
  75 + if (!distributed) {
  76 + failList.add(id);
  77 + }
  78 + } catch (Exception e) {
  79 + if (StringUtils.isValid(id)) {
  80 + failList.add(id);
  81 + }
  82 + log.error("分配公共池客户", e);
  83 + }
  84 + }
  85 + }
  86 + if (!CollectionUtils.isEmpty(failList)) {
  87 + ops.rightPushAll(failList.toArray(new String[0]));
  88 + }
  89 + }
  90 + } finally {
  91 + if (lockPair.getRight().isLocked()) {
  92 + lockPair.getRight().unlock();
  93 + }
  94 + }
  95 + }
  96 +
  97 + private String generateKey(final Long groupId) {
  98 + Assert.notNull(groupId, "groupId cannot be null");
  99 + String day = MonthDay.now().toString().replace("-", "");
  100 + return String.format("%s:%s:%s", getKeyPrefix(), day, groupId);
  101 + }
  102 +}
... ...
fw-valhalla-server/src/main/resources/application.yml
... ... @@ -9,12 +9,14 @@ spring:
9 9 custom:
10 10 global-prefix: 'valhalla'
11 11 global:
12   - ttl: 12h
  12 + ttl: 10m
13 13 cache:
14 14 - name: mobile:attribution
15 15 ttl: 8h
16 16 - name: pub:stand
17 17 ttl: 12h
  18 + - name: group:all:info
  19 + ttl: 2h
18 20  
19 21 cloud:
20 22 nacos:
... ...
fw-valhalla-service/src/main/java/cn/fw/valhalla/service/bus/pub/PubDistributeBizService.java
... ... @@ -2,22 +2,19 @@ package cn.fw.valhalla.service.bus.pub;
2 2  
3 3 import cn.fw.common.cache.CacheExtender;
4 4 import cn.fw.common.cache.locker.DistributedLocker;
  5 +import cn.fw.valhalla.domain.db.pub.PubStandStaffInfo;
5 6 import cn.fw.valhalla.rpc.ehr.EhrRpcService;
6 7 import cn.fw.valhalla.service.data.PubStandStaffInfoService;
7 8 import lombok.AccessLevel;
8 9 import lombok.Getter;
9 10 import lombok.RequiredArgsConstructor;
10 11 import lombok.extern.slf4j.Slf4j;
11   -import org.apache.commons.lang3.tuple.Pair;
12   -import org.redisson.api.RLock;
13 12 import org.springframework.beans.factory.annotation.Value;
14   -import org.springframework.data.redis.core.BoundListOperations;
15 13 import org.springframework.data.redis.core.StringRedisTemplate;
16 14 import org.springframework.stereotype.Service;
17 15 import org.springframework.util.Assert;
18 16  
19 17 import java.time.MonthDay;
20   -import java.util.concurrent.TimeUnit;
21 18  
22 19 /**
23 20 * 公共池站岗分配
... ... @@ -40,26 +37,16 @@ public class PubDistributeBizService {
40 37 @Value("${spring.cache.custom.global-prefix}:stand:pub")
41 38 @Getter(AccessLevel.PRIVATE)
42 39 private String keyPrefix;
  40 +
43 41 /**
44 42 * 执行分配
45 43 *
46   - * @param groupId
  44 + * @param id
47 45 */
48   - public void distribute(Long groupId) {
49   - final String key = generateKey(groupId);
50   - final String lockKey = String.format("pub:distribute:%s", groupId);
51   - Pair<Boolean, RLock> lockPair = distributedLocker.tryLock(lockKey, TimeUnit.MINUTES, 0, -1);
52   - try {
53   - if (Boolean.TRUE.equals(lockPair.getLeft())) {
54   - BoundListOperations<String, String> ops = redisTemplate.boundListOps(key);
55   - String id = ops.leftPop();
56   - }
57   - } finally {
58   - if (lockPair.getRight().isLocked()) {
59   - lockPair.getRight().unlock();
60   - }
61   - }
62   -
  46 + public boolean distribute(Long id) {
  47 + PubStandStaffInfo staffInfo = pubStandStaffInfoService.getById(id);
  48 + // TODO: 2023/3/15 分配
  49 + return true;
63 50 }
64 51  
65 52 private String generateKey(final Long groupId) {
... ...
fw-valhalla-service/src/main/java/cn/fw/valhalla/service/bus/pub/PubStandBizService.java
1 1 package cn.fw.valhalla.service.bus.pub;
2 2  
3   -import cn.fw.common.cache.CacheExtender;
4   -import cn.fw.common.cache.locker.DistributedLocker;
5 3 import cn.fw.common.web.annotation.DisLock;
6 4 import cn.fw.erp.sdk.api.result.UserRoleDataRange;
7 5 import cn.fw.valhalla.common.constant.RoleCode;
... ... @@ -13,7 +11,9 @@ import cn.fw.valhalla.rpc.ehr.EhrRpcService;
13 11 import cn.fw.valhalla.rpc.erp.UserRoleRpcService;
14 12 import cn.fw.valhalla.rpc.erp.dto.UserInfoDTO;
15 13 import cn.fw.valhalla.rpc.oop.OopService;
  14 +import cn.fw.valhalla.rpc.oop.dto.GroupDTO;
16 15 import cn.fw.valhalla.rpc.oop.dto.ShopDTO;
  16 +import cn.fw.valhalla.rpc.shirasawa.ShirasawaRpcService;
17 17 import cn.fw.valhalla.service.data.PubStandStaffInfoService;
18 18 import lombok.AccessLevel;
19 19 import lombok.Getter;
... ... @@ -54,9 +54,8 @@ public class PubStandBizService {
54 54 private final OopService oopService;
55 55 private final UserRoleRpcService userRoleRpcService;
56 56 private final PubStandStaffInfoService pubStandStaffInfoService;
57   - private final CacheExtender cache;
  57 + private final ShirasawaRpcService shirasawaRpcService;
58 58 private final StringRedisTemplate redisTemplate;
59   - private final DistributedLocker distributedLocker;
60 59 private final PlatformTransactionManager platformTransactionManager;
61 60 private final TransactionDefinition transactionDefinition;
62 61 @Value("${spring.cache.custom.global-prefix}:stand:pub")
... ... @@ -65,6 +64,16 @@ public class PubStandBizService {
65 64  
66 65 /**
67 66 * 同步站岗队列
  67 + */
  68 + public void syncQueue() {
  69 + List<GroupDTO> list = oopService.allGroup();
  70 + for (GroupDTO group : list) {
  71 + syncQueue(group.getId());
  72 + }
  73 + }
  74 +
  75 + /**
  76 + * 同步站岗队列
68 77 *
69 78 * @param groupId
70 79 */
... ... @@ -184,8 +193,7 @@ public class PubStandBizService {
184 193 }
185 194  
186 195 private boolean hasAnyTodo(final Long staffId) {
187   - //todo 跟进系统 跟进结果返回生于的跟进数量并且提供主动查询的接口
188   - return true;
  196 + return shirasawaRpcService.hasOngoingFollow(staffId);
189 197 }
190 198  
191 199 private PubStandStaffInfo generateStandInfo(UserInfoDTO infoDTO) {
... ...
... ... @@ -64,7 +64,7 @@
64 64 <yitter.idgenerator>1.0.3</yitter.idgenerator>
65 65 <fw.hestia.sdk>1.0.0</fw.hestia.sdk>
66 66 <fw.backlog.sdk>1.0.0</fw.backlog.sdk>
67   - <fw.shirasawa.sdk>1.0.2</fw.shirasawa.sdk>
  67 + <fw.shirasawa.sdk>1.0.3</fw.shirasawa.sdk>
68 68 <fw-identify-sdk.version>1.0.0</fw-identify-sdk.version>
69 69 <fw-attendance-sdk.version>1.0.0</fw-attendance-sdk.version>
70 70 </properties>
... ...