Commit e81413d6e170bf9a8b9d6af0d8421dcca33af902

Authored by 张志伟
2 parents 4d9fe9a9 e2e47ff6

Merge remote-tracking branch 'origin/dev' into test

fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowRecordTask.java
1 1 package cn.fw.shirasawa.server.controller.task;
2 2  
  3 +import cn.fw.common.cache.locker.DistributedLocker;
3 4 import cn.fw.shirasawa.common.utils.DateUtil;
4 5 import cn.fw.shirasawa.domain.db.follow.FollowRecord;
5 6 import cn.fw.shirasawa.domain.db.follow.FollowTask;
... ... @@ -15,9 +16,13 @@ import cn.fw.shirasawa.service.data.FollowRecordService;
15 16 import cn.fw.shirasawa.service.data.FollowTaskService;
16 17 import com.alibaba.fastjson.JSONObject;
17 18 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  19 +import lombok.Getter;
18 20 import lombok.extern.slf4j.Slf4j;
  21 +import org.redisson.api.RLock;
19 22 import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.beans.factory.annotation.Value;
20 24 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  25 +import org.springframework.lang.NonNull;
21 26 import org.springframework.scheduling.annotation.Scheduled;
22 27 import org.springframework.stereotype.Component;
23 28 import org.springframework.transaction.annotation.Transactional;
... ... @@ -25,6 +30,7 @@ import org.springframework.util.CollectionUtils;
25 30  
26 31 import java.time.LocalDateTime;
27 32 import java.util.*;
  33 +import java.util.concurrent.locks.Lock;
28 34  
29 35 /**
30 36 * @author : kurisu
... ... @@ -41,18 +47,25 @@ public class FollowRecordTask {
41 47 private final TodoRpcService todoRpcService;
42 48 private final FollowTaskService followTaskService;
43 49 private final EhrRpcService ehrRpcService;
  50 + private final DistributedLocker distributedLocker;
  51 +
  52 + @Value("${spring.cache.custom.global-prefix}:task")
  53 + @Getter
  54 + private String prefix;
44 55  
45 56 @Autowired
46 57 public FollowRecordTask(final FollowRecordService followRecordService,
47 58 final FollowBizService followBizService,
48 59 final TodoRpcService todoRpcService,
49 60 final FollowTaskService followTaskService,
50   - final EhrRpcService ehrRpcService) {
  61 + final EhrRpcService ehrRpcService,
  62 + final DistributedLocker distributedLocker) {
51 63 this.followRecordService = followRecordService;
52 64 this.followBizService = followBizService;
53 65 this.todoRpcService = todoRpcService;
54 66 this.followTaskService = followTaskService;
55 67 this.ehrRpcService = ehrRpcService;
  68 + this.distributedLocker = distributedLocker;
56 69 }
57 70  
58 71 /**
... ... @@ -60,17 +73,27 @@ public class FollowRecordTask {
60 73 */
61 74 @Scheduled(initialDelay = 1000 * 15, fixedRate = 1000 * 5)
62 75 public void endTaskRecord() {
63   - List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery()
64   - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING)
65   - .le(FollowRecord::getDeadline, LocalDateTime.now())
66   - .eq(FollowRecord::getYn, Boolean.TRUE)
67   - .last("limit 0, 500")
68   - );
69   - if (CollectionUtils.isEmpty(list)) {
  76 + final String cacheName = getCacheName("end-task");
  77 + Lock lock = distributedLocker.lock(cacheName);
  78 + boolean locked = ((RLock) lock).isLocked();
  79 + if (!locked) {
70 80 return;
71 81 }
72   - for (FollowRecord record : list) {
73   - followBizService.overdueProcessing(record);
  82 + try {
  83 + List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery()
  84 + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING)
  85 + .le(FollowRecord::getDeadline, LocalDateTime.now())
  86 + .eq(FollowRecord::getYn, Boolean.TRUE)
  87 + .last("limit 0, 500")
  88 + );
  89 + if (CollectionUtils.isEmpty(list)) {
  90 + return;
  91 + }
  92 + for (FollowRecord record : list) {
  93 + followBizService.overdueProcessing(record);
  94 + }
  95 + } finally {
  96 + distributedLocker.unlock(lock);
74 97 }
75 98 }
76 99  
... ... @@ -80,15 +103,25 @@ public class FollowRecordTask {
80 103 */
81 104 @Scheduled(cron = "0/8 * 8-18 * * *")
82 105 public void push2NorTodo() {
83   - List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery()
84   - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING)
85   - .eq(FollowRecord::getAddTodo, Boolean.FALSE)
86   - .notIn(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF)
87   - .le(FollowRecord::getPlanTime, LocalDateTime.now())
88   - .eq(FollowRecord::getYn, Boolean.TRUE)
89   - .last("limit 0,500")
90   - );
91   - execute(list);
  106 + final String cacheName = getCacheName("push-task:normal");
  107 + Lock lock = distributedLocker.lock(cacheName);
  108 + boolean locked = ((RLock) lock).isLocked();
  109 + if (!locked) {
  110 + return;
  111 + }
  112 + try {
  113 + List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery()
  114 + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING)
  115 + .eq(FollowRecord::getAddTodo, Boolean.FALSE)
  116 + .notIn(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF)
  117 + .le(FollowRecord::getPlanTime, LocalDateTime.now())
  118 + .eq(FollowRecord::getYn, Boolean.TRUE)
  119 + .last("limit 0,500")
  120 + );
  121 + execute(list);
  122 + } finally {
  123 + distributedLocker.unlock(lock);
  124 + }
92 125 }
93 126  
94 127 /**
... ... @@ -97,15 +130,25 @@ public class FollowRecordTask {
97 130 */
98 131 @Scheduled(initialDelay = 1500, fixedRate = 1000 * 3)
99 132 public void push2AccTodo() {
100   - List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery()
101   - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING)
102   - .eq(FollowRecord::getAddTodo, Boolean.FALSE)
103   - .eq(FollowRecord::getYn, Boolean.TRUE)
104   - .in(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF)
105   - .le(FollowRecord::getPlanTime, LocalDateTime.now())
106   - .last("limit 0,50")
107   - );
108   - execute(list);
  133 + final String cacheName = getCacheName("push-task:acc");
  134 + Lock lock = distributedLocker.lock(cacheName);
  135 + boolean locked = ((RLock) lock).isLocked();
  136 + if (!locked) {
  137 + return;
  138 + }
  139 + try {
  140 + List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery()
  141 + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING)
  142 + .eq(FollowRecord::getAddTodo, Boolean.FALSE)
  143 + .eq(FollowRecord::getYn, Boolean.TRUE)
  144 + .in(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF)
  145 + .le(FollowRecord::getPlanTime, LocalDateTime.now())
  146 + .last("limit 0,50")
  147 + );
  148 + execute(list);
  149 + } finally {
  150 + distributedLocker.unlock(lock);
  151 + }
109 152 }
110 153  
111 154  
... ... @@ -114,9 +157,19 @@ public class FollowRecordTask {
114 157 */
115 158 @Scheduled(initialDelay = 1500, fixedRate = 15 * 1000)
116 159 public void retryCompleteTodoItem() {
117   - String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.COMPLETE);
118   - Collection<BackLogItemDTO> all = todoRpcService.getAllFromCache(key);
119   - all.forEach(todoRpcService::complete);
  160 + final String cacheName = getCacheName("retry:complete");
  161 + Lock lock = distributedLocker.lock(cacheName);
  162 + boolean locked = ((RLock) lock).isLocked();
  163 + if (!locked) {
  164 + return;
  165 + }
  166 + try {
  167 + String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.COMPLETE);
  168 + Collection<BackLogItemDTO> all = todoRpcService.getAllFromCache(key);
  169 + all.forEach(todoRpcService::complete);
  170 + } finally {
  171 + distributedLocker.unlock(lock);
  172 + }
120 173 }
121 174  
122 175 /**
... ... @@ -124,9 +177,19 @@ public class FollowRecordTask {
124 177 */
125 178 @Scheduled(initialDelay = 1000, fixedRate = 10 * 1000)
126 179 public void retryCancelTodoItem() {
127   - String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.CANCEL);
128   - Collection<BackLogItemDTO> all = todoRpcService.getAllFromCache(key);
129   - all.forEach(todoRpcService::cancel);
  180 + final String cacheName = getCacheName("retry:cancel");
  181 + Lock lock = distributedLocker.lock(cacheName);
  182 + boolean locked = ((RLock) lock).isLocked();
  183 + if (!locked) {
  184 + return;
  185 + }
  186 + try {
  187 + String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.CANCEL);
  188 + Collection<BackLogItemDTO> all = todoRpcService.getAllFromCache(key);
  189 + all.forEach(todoRpcService::cancel);
  190 + } finally {
  191 + distributedLocker.unlock(lock);
  192 + }
130 193 }
131 194  
132 195  
... ... @@ -174,4 +237,8 @@ public class FollowRecordTask {
174 237 return false;
175 238 }
176 239 }
  240 +
  241 + private String getCacheName(@NonNull String name) {
  242 + return getPrefix() + ":" + name;
  243 + }
177 244 }
... ...
fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowTaskDealTask.java
1 1 package cn.fw.shirasawa.server.controller.task;
2 2  
  3 +import cn.fw.common.cache.locker.DistributedLocker;
3 4 import cn.fw.shirasawa.domain.db.follow.FollowTask;
4 5 import cn.fw.shirasawa.domain.db.pool.CluePool;
5 6 import cn.fw.shirasawa.domain.enums.ClueStatusEnum;
... ... @@ -11,13 +12,14 @@ import cn.hutool.core.thread.ThreadFactoryBuilder;
11 12 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
12 13 import lombok.Getter;
13 14 import lombok.extern.slf4j.Slf4j;
  15 +import org.redisson.api.RLock;
14 16 import org.springframework.beans.factory.annotation.Autowired;
15 17 import org.springframework.beans.factory.annotation.Value;
16 18 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
17 19 import org.springframework.data.redis.core.StringRedisTemplate;
  20 +import org.springframework.lang.NonNull;
18 21 import org.springframework.scheduling.annotation.Scheduled;
19 22 import org.springframework.stereotype.Component;
20   -import org.springframework.transaction.annotation.Transactional;
21 23 import org.springframework.util.CollectionUtils;
22 24  
23 25 import java.time.LocalDate;
... ... @@ -29,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
29 31 import java.util.concurrent.LinkedBlockingQueue;
30 32 import java.util.concurrent.ThreadPoolExecutor;
31 33 import java.util.concurrent.TimeUnit;
  34 +import java.util.concurrent.locks.Lock;
32 35  
33 36 /**
34 37 * @author : kurisu
... ... @@ -47,19 +50,25 @@ public class FollowTaskDealTask {
47 50 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100),
48 51 new ThreadFactoryBuilder().setNamePrefix("close-clue-pool-").build(),
49 52 new ThreadPoolExecutor.AbortPolicy());
  53 + private final DistributedLocker distributedLocker;
50 54 private final StringRedisTemplate redisTemplate;
51 55 @Value("${spring.cache.locker.key-prefix}:termination")
52 56 @Getter
53 57 private String planKey;
  58 + @Value("${spring.cache.custom.global-prefix}:clue")
  59 + @Getter
  60 + private String prefix;
54 61  
55 62 @Autowired
56 63 public FollowTaskDealTask(final FollowTaskService followTaskService,
57 64 final FollowBizService followBizService,
58 65 final CluePoolService cluePoolService,
  66 + final DistributedLocker distributedLocker,
59 67 final StringRedisTemplate redisTemplate) {
60 68 this.followTaskService = followTaskService;
61 69 this.followBizService = followBizService;
62 70 this.cluePoolService = cluePoolService;
  71 + this.distributedLocker = distributedLocker;
63 72 this.redisTemplate = redisTemplate;
64 73 }
65 74  
... ... @@ -68,17 +77,27 @@ public class FollowTaskDealTask {
68 77 */
69 78 @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10)
70 79 public void startClue() {
71   - List<CluePool> list = cluePoolService.list(Wrappers.<CluePool>lambdaQuery()
72   - .eq(CluePool::getClueStatus, ClueStatusEnum.WAITING)
73   - .ge(CluePool::getStartTime, LocalDate.now().atStartOfDay())
74   - .le(CluePool::getStartTime, LocalDateTime.now())
75   - .last("limit 0, 500")
76   - );
77   - if (CollectionUtils.isEmpty(list)) {
  80 + String key = getCacheName("start");
  81 + Lock lock = distributedLocker.lock(key);
  82 + boolean locked = ((RLock) lock).isLocked();
  83 + if (!locked) {
78 84 return;
79 85 }
80   - for (CluePool cluePool : list) {
81   - followBizService.startClue(cluePool);
  86 + try {
  87 + List<CluePool> list = cluePoolService.list(Wrappers.<CluePool>lambdaQuery()
  88 + .eq(CluePool::getClueStatus, ClueStatusEnum.WAITING)
  89 + .ge(CluePool::getStartTime, LocalDate.now().atStartOfDay())
  90 + .le(CluePool::getStartTime, LocalDateTime.now())
  91 + .last("limit 0, 500")
  92 + );
  93 + if (CollectionUtils.isEmpty(list)) {
  94 + return;
  95 + }
  96 + for (CluePool cluePool : list) {
  97 + followBizService.startClue(cluePool);
  98 + }
  99 + } finally {
  100 + distributedLocker.unlock(lock);
82 101 }
83 102 }
84 103  
... ... @@ -87,15 +106,25 @@ public class FollowTaskDealTask {
87 106 */
88 107 @Scheduled(initialDelay = 1000 * 30, fixedRate = 1000 * 60)
89 108 public void endTask() {
90   - List<FollowTask> list = followTaskService.list(Wrappers.<FollowTask>lambdaQuery()
91   - .eq(FollowTask::getState, TaskStateEnum.ONGOING)
92   - .le(FollowTask::getDeadline, LocalDateTime.now())
93   - );
94   - if (CollectionUtils.isEmpty(list)) {
  109 + String key = getCacheName("end");
  110 + Lock lock = distributedLocker.lock(key);
  111 + boolean locked = ((RLock) lock).isLocked();
  112 + if (!locked) {
95 113 return;
96 114 }
97   - for (FollowTask r : list) {
98   - followBizService.endTask(r);
  115 + try {
  116 + List<FollowTask> list = followTaskService.list(Wrappers.<FollowTask>lambdaQuery()
  117 + .eq(FollowTask::getState, TaskStateEnum.ONGOING)
  118 + .le(FollowTask::getDeadline, LocalDateTime.now())
  119 + );
  120 + if (CollectionUtils.isEmpty(list)) {
  121 + return;
  122 + }
  123 + for (FollowTask r : list) {
  124 + followBizService.endTask(r);
  125 + }
  126 + } finally {
  127 + distributedLocker.unlock(lock);
99 128 }
100 129 }
101 130  
... ... @@ -133,4 +162,8 @@ public class FollowTaskDealTask {
133 162 THREAD_POOL.execute(() -> followBizService.endTask(cluePool));
134 163 }
135 164 }
  165 +
  166 + private String getCacheName(@NonNull String name) {
  167 + return getPrefix() + ":" + name;
  168 + }
136 169 }
... ...
fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/OriginDataDealTask.java
1 1 package cn.fw.shirasawa.server.controller.task;
2 2  
  3 +import cn.fw.common.cache.locker.DistributedLocker;
3 4 import cn.fw.shirasawa.domain.db.OriginalData;
4 5 import cn.fw.shirasawa.service.bus.follow.FollowBizService;
5 6 import cn.fw.shirasawa.service.data.OriginalDataService;
6 7 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  8 +import lombok.Getter;
  9 +import org.redisson.api.RLock;
7 10 import org.springframework.beans.factory.annotation.Autowired;
  11 +import org.springframework.beans.factory.annotation.Value;
8 12 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
9 13 import org.springframework.scheduling.annotation.Scheduled;
10 14 import org.springframework.stereotype.Component;
11 15 import org.springframework.util.CollectionUtils;
12 16  
13 17 import java.util.List;
  18 +import java.util.concurrent.locks.Lock;
14 19  
15 20 /**
16 21 * @author : kurisu
... ... @@ -23,26 +28,43 @@ import java.util.List;
23 28 public class OriginDataDealTask {
24 29 private final OriginalDataService originalDataService;
25 30 private final FollowBizService followBizService;
  31 + private final DistributedLocker distributedLocker;
  32 +
  33 + @Value("${spring.cache.custom.global-prefix}:origin")
  34 + @Getter
  35 + private String prefix;
26 36  
27 37 @Autowired
28 38 public OriginDataDealTask(final OriginalDataService originalDataService,
29   - final FollowBizService followBizService) {
  39 + final FollowBizService followBizService,
  40 + final DistributedLocker distributedLocker) {
30 41 this.originalDataService = originalDataService;
31 42 this.followBizService = followBizService;
  43 + this.distributedLocker = distributedLocker;
32 44 }
33 45  
34 46 @Scheduled(initialDelay = 1000 * 15, fixedRate = 1000 * 15)
35 47 public void syn2Task() {
36   - List<OriginalData> list = originalDataService.list(Wrappers.<OriginalData>lambdaQuery()
37   - .eq(OriginalData::getSolved, Boolean.FALSE)
38   - .last("limit 0,500")
39   - );
40   - if (CollectionUtils.isEmpty(list)) {
  48 + String key = getPrefix() + ":syncdata";
  49 + Lock lock = distributedLocker.lock(key);
  50 + boolean locked = ((RLock) lock).isLocked();
  51 + if (!locked) {
41 52 return;
42 53 }
43   - for (OriginalData data : list) {
44   - data.setSolved(followBizService.origin2task(data));
  54 + try {
  55 + List<OriginalData> list = originalDataService.list(Wrappers.<OriginalData>lambdaQuery()
  56 + .eq(OriginalData::getSolved, Boolean.FALSE)
  57 + .last("limit 0,500")
  58 + );
  59 + if (CollectionUtils.isEmpty(list)) {
  60 + return;
  61 + }
  62 + for (OriginalData data : list) {
  63 + data.setSolved(followBizService.origin2task(data));
  64 + }
  65 + originalDataService.updateBatchById(list);
  66 + } finally {
  67 + distributedLocker.unlock(lock);
45 68 }
46   - originalDataService.updateBatchById(list);
47 69 }
48 70 }
... ...
fw-shirasawa-server/src/main/resources/application-local.yml
... ... @@ -10,6 +10,10 @@ nacos:
10 10 plugin:
11 11 namespace: df959b8c-de58-4b02-b9fb-d65ca3be05f3
12 12 spring:
  13 + cloud:
  14 + inetutils:
  15 + preferred-networks:
  16 + - 10.8
13 17 application:
14 18 name: fw-shirasawa-local
15 19 datasource:
... ...