Commit 2bf416a41a68fe4e43ff0195a4c87946f64611cb
Merge remote-tracking branch 'origin/test'
Showing
4 changed files
with
186 additions
and
60 deletions
fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowRecordTask.java
1 | 1 | package cn.fw.shirasawa.server.controller.task; |
2 | 2 | |
3 | +import cn.fw.common.cache.locker.DistributedLocker; | |
3 | 4 | import cn.fw.shirasawa.common.utils.DateUtil; |
4 | 5 | import cn.fw.shirasawa.domain.db.follow.FollowRecord; |
5 | 6 | import cn.fw.shirasawa.domain.db.follow.FollowTask; |
... | ... | @@ -15,9 +16,13 @@ import cn.fw.shirasawa.service.data.FollowRecordService; |
15 | 16 | import cn.fw.shirasawa.service.data.FollowTaskService; |
16 | 17 | import com.alibaba.fastjson.JSONObject; |
17 | 18 | import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
19 | +import lombok.Getter; | |
18 | 20 | import lombok.extern.slf4j.Slf4j; |
21 | +import org.redisson.api.RLock; | |
19 | 22 | import org.springframework.beans.factory.annotation.Autowired; |
23 | +import org.springframework.beans.factory.annotation.Value; | |
20 | 24 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
25 | +import org.springframework.lang.NonNull; | |
21 | 26 | import org.springframework.scheduling.annotation.Scheduled; |
22 | 27 | import org.springframework.stereotype.Component; |
23 | 28 | import org.springframework.transaction.annotation.Transactional; |
... | ... | @@ -25,6 +30,7 @@ import org.springframework.util.CollectionUtils; |
25 | 30 | |
26 | 31 | import java.time.LocalDateTime; |
27 | 32 | import java.util.*; |
33 | +import java.util.concurrent.locks.Lock; | |
28 | 34 | |
29 | 35 | /** |
30 | 36 | * @author : kurisu |
... | ... | @@ -41,18 +47,25 @@ public class FollowRecordTask { |
41 | 47 | private final TodoRpcService todoRpcService; |
42 | 48 | private final FollowTaskService followTaskService; |
43 | 49 | private final EhrRpcService ehrRpcService; |
50 | + private final DistributedLocker distributedLocker; | |
51 | + | |
52 | + @Value("${spring.cache.custom.global-prefix}:task") | |
53 | + @Getter | |
54 | + private String prefix; | |
44 | 55 | |
45 | 56 | @Autowired |
46 | 57 | public FollowRecordTask(final FollowRecordService followRecordService, |
47 | 58 | final FollowBizService followBizService, |
48 | 59 | final TodoRpcService todoRpcService, |
49 | 60 | final FollowTaskService followTaskService, |
50 | - final EhrRpcService ehrRpcService) { | |
61 | + final EhrRpcService ehrRpcService, | |
62 | + final DistributedLocker distributedLocker) { | |
51 | 63 | this.followRecordService = followRecordService; |
52 | 64 | this.followBizService = followBizService; |
53 | 65 | this.todoRpcService = todoRpcService; |
54 | 66 | this.followTaskService = followTaskService; |
55 | 67 | this.ehrRpcService = ehrRpcService; |
68 | + this.distributedLocker = distributedLocker; | |
56 | 69 | } |
57 | 70 | |
58 | 71 | /** |
... | ... | @@ -60,17 +73,27 @@ public class FollowRecordTask { |
60 | 73 | */ |
61 | 74 | @Scheduled(initialDelay = 1000 * 15, fixedRate = 1000 * 5) |
62 | 75 | public void endTaskRecord() { |
63 | - List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery() | |
64 | - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) | |
65 | - .le(FollowRecord::getDeadline, LocalDateTime.now()) | |
66 | - .eq(FollowRecord::getYn, Boolean.TRUE) | |
67 | - .last("limit 0, 500") | |
68 | - ); | |
69 | - if (CollectionUtils.isEmpty(list)) { | |
76 | + final String cacheName = getCacheName("end-task"); | |
77 | + Lock lock = distributedLocker.lock(cacheName); | |
78 | + boolean locked = ((RLock) lock).isLocked(); | |
79 | + if (!locked) { | |
70 | 80 | return; |
71 | 81 | } |
72 | - for (FollowRecord record : list) { | |
73 | - followBizService.overdueProcessing(record); | |
82 | + try { | |
83 | + List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery() | |
84 | + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) | |
85 | + .le(FollowRecord::getDeadline, LocalDateTime.now()) | |
86 | + .eq(FollowRecord::getYn, Boolean.TRUE) | |
87 | + .last("limit 0, 500") | |
88 | + ); | |
89 | + if (CollectionUtils.isEmpty(list)) { | |
90 | + return; | |
91 | + } | |
92 | + for (FollowRecord record : list) { | |
93 | + followBizService.overdueProcessing(record); | |
94 | + } | |
95 | + } finally { | |
96 | + distributedLocker.unlock(lock); | |
74 | 97 | } |
75 | 98 | } |
76 | 99 | |
... | ... | @@ -80,15 +103,25 @@ public class FollowRecordTask { |
80 | 103 | */ |
81 | 104 | @Scheduled(cron = "0/8 * 8-18 * * *") |
82 | 105 | public void push2NorTodo() { |
83 | - List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery() | |
84 | - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) | |
85 | - .eq(FollowRecord::getAddTodo, Boolean.FALSE) | |
86 | - .notIn(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF) | |
87 | - .le(FollowRecord::getPlanTime, LocalDateTime.now()) | |
88 | - .eq(FollowRecord::getYn, Boolean.TRUE) | |
89 | - .last("limit 0,500") | |
90 | - ); | |
91 | - execute(list); | |
106 | + final String cacheName = getCacheName("push-task:normal"); | |
107 | + Lock lock = distributedLocker.lock(cacheName); | |
108 | + boolean locked = ((RLock) lock).isLocked(); | |
109 | + if (!locked) { | |
110 | + return; | |
111 | + } | |
112 | + try { | |
113 | + List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery() | |
114 | + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) | |
115 | + .eq(FollowRecord::getAddTodo, Boolean.FALSE) | |
116 | + .notIn(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF) | |
117 | + .le(FollowRecord::getPlanTime, LocalDateTime.now()) | |
118 | + .eq(FollowRecord::getYn, Boolean.TRUE) | |
119 | + .last("limit 0,500") | |
120 | + ); | |
121 | + execute(list); | |
122 | + } finally { | |
123 | + distributedLocker.unlock(lock); | |
124 | + } | |
92 | 125 | } |
93 | 126 | |
94 | 127 | /** |
... | ... | @@ -97,15 +130,25 @@ public class FollowRecordTask { |
97 | 130 | */ |
98 | 131 | @Scheduled(initialDelay = 1500, fixedRate = 1000 * 3) |
99 | 132 | public void push2AccTodo() { |
100 | - List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery() | |
101 | - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) | |
102 | - .eq(FollowRecord::getAddTodo, Boolean.FALSE) | |
103 | - .eq(FollowRecord::getYn, Boolean.TRUE) | |
104 | - .in(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF) | |
105 | - .le(FollowRecord::getPlanTime, LocalDateTime.now()) | |
106 | - .last("limit 0,50") | |
107 | - ); | |
108 | - execute(list); | |
133 | + final String cacheName = getCacheName("push-task:acc"); | |
134 | + Lock lock = distributedLocker.lock(cacheName); | |
135 | + boolean locked = ((RLock) lock).isLocked(); | |
136 | + if (!locked) { | |
137 | + return; | |
138 | + } | |
139 | + try { | |
140 | + List<FollowRecord> list = followRecordService.list(Wrappers.<FollowRecord>lambdaQuery() | |
141 | + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) | |
142 | + .eq(FollowRecord::getAddTodo, Boolean.FALSE) | |
143 | + .eq(FollowRecord::getYn, Boolean.TRUE) | |
144 | + .in(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF) | |
145 | + .le(FollowRecord::getPlanTime, LocalDateTime.now()) | |
146 | + .last("limit 0,50") | |
147 | + ); | |
148 | + execute(list); | |
149 | + } finally { | |
150 | + distributedLocker.unlock(lock); | |
151 | + } | |
109 | 152 | } |
110 | 153 | |
111 | 154 | |
... | ... | @@ -114,9 +157,19 @@ public class FollowRecordTask { |
114 | 157 | */ |
115 | 158 | @Scheduled(initialDelay = 1500, fixedRate = 15 * 1000) |
116 | 159 | public void retryCompleteTodoItem() { |
117 | - String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.COMPLETE); | |
118 | - Collection<BackLogItemDTO> all = todoRpcService.getAllFromCache(key); | |
119 | - all.forEach(todoRpcService::complete); | |
160 | + final String cacheName = getCacheName("retry:complete"); | |
161 | + Lock lock = distributedLocker.lock(cacheName); | |
162 | + boolean locked = ((RLock) lock).isLocked(); | |
163 | + if (!locked) { | |
164 | + return; | |
165 | + } | |
166 | + try { | |
167 | + String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.COMPLETE); | |
168 | + Collection<BackLogItemDTO> all = todoRpcService.getAllFromCache(key); | |
169 | + all.forEach(todoRpcService::complete); | |
170 | + } finally { | |
171 | + distributedLocker.unlock(lock); | |
172 | + } | |
120 | 173 | } |
121 | 174 | |
122 | 175 | /** |
... | ... | @@ -124,9 +177,19 @@ public class FollowRecordTask { |
124 | 177 | */ |
125 | 178 | @Scheduled(initialDelay = 1000, fixedRate = 10 * 1000) |
126 | 179 | public void retryCancelTodoItem() { |
127 | - String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.CANCEL); | |
128 | - Collection<BackLogItemDTO> all = todoRpcService.getAllFromCache(key); | |
129 | - all.forEach(todoRpcService::cancel); | |
180 | + final String cacheName = getCacheName("retry:cancel"); | |
181 | + Lock lock = distributedLocker.lock(cacheName); | |
182 | + boolean locked = ((RLock) lock).isLocked(); | |
183 | + if (!locked) { | |
184 | + return; | |
185 | + } | |
186 | + try { | |
187 | + String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.CANCEL); | |
188 | + Collection<BackLogItemDTO> all = todoRpcService.getAllFromCache(key); | |
189 | + all.forEach(todoRpcService::cancel); | |
190 | + } finally { | |
191 | + distributedLocker.unlock(lock); | |
192 | + } | |
130 | 193 | } |
131 | 194 | |
132 | 195 | |
... | ... | @@ -174,4 +237,8 @@ public class FollowRecordTask { |
174 | 237 | return false; |
175 | 238 | } |
176 | 239 | } |
240 | + | |
241 | + private String getCacheName(@NonNull String name) { | |
242 | + return getPrefix() + ":" + name; | |
243 | + } | |
177 | 244 | } | ... | ... |
fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowTaskDealTask.java
1 | 1 | package cn.fw.shirasawa.server.controller.task; |
2 | 2 | |
3 | +import cn.fw.common.cache.locker.DistributedLocker; | |
3 | 4 | import cn.fw.shirasawa.domain.db.follow.FollowTask; |
4 | 5 | import cn.fw.shirasawa.domain.db.pool.CluePool; |
5 | 6 | import cn.fw.shirasawa.domain.enums.ClueStatusEnum; |
... | ... | @@ -11,13 +12,14 @@ import cn.hutool.core.thread.ThreadFactoryBuilder; |
11 | 12 | import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
12 | 13 | import lombok.Getter; |
13 | 14 | import lombok.extern.slf4j.Slf4j; |
15 | +import org.redisson.api.RLock; | |
14 | 16 | import org.springframework.beans.factory.annotation.Autowired; |
15 | 17 | import org.springframework.beans.factory.annotation.Value; |
16 | 18 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
17 | 19 | import org.springframework.data.redis.core.StringRedisTemplate; |
20 | +import org.springframework.lang.NonNull; | |
18 | 21 | import org.springframework.scheduling.annotation.Scheduled; |
19 | 22 | import org.springframework.stereotype.Component; |
20 | -import org.springframework.transaction.annotation.Transactional; | |
21 | 23 | import org.springframework.util.CollectionUtils; |
22 | 24 | |
23 | 25 | import java.time.LocalDate; |
... | ... | @@ -29,6 +31,7 @@ import java.util.concurrent.BlockingQueue; |
29 | 31 | import java.util.concurrent.LinkedBlockingQueue; |
30 | 32 | import java.util.concurrent.ThreadPoolExecutor; |
31 | 33 | import java.util.concurrent.TimeUnit; |
34 | +import java.util.concurrent.locks.Lock; | |
32 | 35 | |
33 | 36 | /** |
34 | 37 | * @author : kurisu |
... | ... | @@ -47,19 +50,25 @@ public class FollowTaskDealTask { |
47 | 50 | TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), |
48 | 51 | new ThreadFactoryBuilder().setNamePrefix("close-clue-pool-").build(), |
49 | 52 | new ThreadPoolExecutor.AbortPolicy()); |
53 | + private final DistributedLocker distributedLocker; | |
50 | 54 | private final StringRedisTemplate redisTemplate; |
51 | 55 | @Value("${spring.cache.locker.key-prefix}:termination") |
52 | 56 | @Getter |
53 | 57 | private String planKey; |
58 | + @Value("${spring.cache.custom.global-prefix}:clue") | |
59 | + @Getter | |
60 | + private String prefix; | |
54 | 61 | |
55 | 62 | @Autowired |
56 | 63 | public FollowTaskDealTask(final FollowTaskService followTaskService, |
57 | 64 | final FollowBizService followBizService, |
58 | 65 | final CluePoolService cluePoolService, |
66 | + final DistributedLocker distributedLocker, | |
59 | 67 | final StringRedisTemplate redisTemplate) { |
60 | 68 | this.followTaskService = followTaskService; |
61 | 69 | this.followBizService = followBizService; |
62 | 70 | this.cluePoolService = cluePoolService; |
71 | + this.distributedLocker = distributedLocker; | |
63 | 72 | this.redisTemplate = redisTemplate; |
64 | 73 | } |
65 | 74 | |
... | ... | @@ -68,17 +77,27 @@ public class FollowTaskDealTask { |
68 | 77 | */ |
69 | 78 | @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10) |
70 | 79 | public void startClue() { |
71 | - List<CluePool> list = cluePoolService.list(Wrappers.<CluePool>lambdaQuery() | |
72 | - .eq(CluePool::getClueStatus, ClueStatusEnum.WAITING) | |
73 | - .ge(CluePool::getStartTime, LocalDate.now().atStartOfDay()) | |
74 | - .le(CluePool::getStartTime, LocalDateTime.now()) | |
75 | - .last("limit 0, 500") | |
76 | - ); | |
77 | - if (CollectionUtils.isEmpty(list)) { | |
80 | + String key = getCacheName("start"); | |
81 | + Lock lock = distributedLocker.lock(key); | |
82 | + boolean locked = ((RLock) lock).isLocked(); | |
83 | + if (!locked) { | |
78 | 84 | return; |
79 | 85 | } |
80 | - for (CluePool cluePool : list) { | |
81 | - followBizService.startClue(cluePool); | |
86 | + try { | |
87 | + List<CluePool> list = cluePoolService.list(Wrappers.<CluePool>lambdaQuery() | |
88 | + .eq(CluePool::getClueStatus, ClueStatusEnum.WAITING) | |
89 | + .ge(CluePool::getStartTime, LocalDate.now().atStartOfDay()) | |
90 | + .le(CluePool::getStartTime, LocalDateTime.now()) | |
91 | + .last("limit 0, 500") | |
92 | + ); | |
93 | + if (CollectionUtils.isEmpty(list)) { | |
94 | + return; | |
95 | + } | |
96 | + for (CluePool cluePool : list) { | |
97 | + followBizService.startClue(cluePool); | |
98 | + } | |
99 | + } finally { | |
100 | + distributedLocker.unlock(lock); | |
82 | 101 | } |
83 | 102 | } |
84 | 103 | |
... | ... | @@ -87,15 +106,25 @@ public class FollowTaskDealTask { |
87 | 106 | */ |
88 | 107 | @Scheduled(initialDelay = 1000 * 30, fixedRate = 1000 * 60) |
89 | 108 | public void endTask() { |
90 | - List<FollowTask> list = followTaskService.list(Wrappers.<FollowTask>lambdaQuery() | |
91 | - .eq(FollowTask::getState, TaskStateEnum.ONGOING) | |
92 | - .le(FollowTask::getDeadline, LocalDateTime.now()) | |
93 | - ); | |
94 | - if (CollectionUtils.isEmpty(list)) { | |
109 | + String key = getCacheName("end"); | |
110 | + Lock lock = distributedLocker.lock(key); | |
111 | + boolean locked = ((RLock) lock).isLocked(); | |
112 | + if (!locked) { | |
95 | 113 | return; |
96 | 114 | } |
97 | - for (FollowTask r : list) { | |
98 | - followBizService.endTask(r); | |
115 | + try { | |
116 | + List<FollowTask> list = followTaskService.list(Wrappers.<FollowTask>lambdaQuery() | |
117 | + .eq(FollowTask::getState, TaskStateEnum.ONGOING) | |
118 | + .le(FollowTask::getDeadline, LocalDateTime.now()) | |
119 | + ); | |
120 | + if (CollectionUtils.isEmpty(list)) { | |
121 | + return; | |
122 | + } | |
123 | + for (FollowTask r : list) { | |
124 | + followBizService.endTask(r); | |
125 | + } | |
126 | + } finally { | |
127 | + distributedLocker.unlock(lock); | |
99 | 128 | } |
100 | 129 | } |
101 | 130 | |
... | ... | @@ -133,4 +162,8 @@ public class FollowTaskDealTask { |
133 | 162 | THREAD_POOL.execute(() -> followBizService.endTask(cluePool)); |
134 | 163 | } |
135 | 164 | } |
165 | + | |
166 | + private String getCacheName(@NonNull String name) { | |
167 | + return getPrefix() + ":" + name; | |
168 | + } | |
136 | 169 | } | ... | ... |
fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/OriginDataDealTask.java
1 | 1 | package cn.fw.shirasawa.server.controller.task; |
2 | 2 | |
3 | +import cn.fw.common.cache.locker.DistributedLocker; | |
3 | 4 | import cn.fw.shirasawa.domain.db.OriginalData; |
4 | 5 | import cn.fw.shirasawa.service.bus.follow.FollowBizService; |
5 | 6 | import cn.fw.shirasawa.service.data.OriginalDataService; |
6 | 7 | import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
8 | +import lombok.Getter; | |
9 | +import org.redisson.api.RLock; | |
7 | 10 | import org.springframework.beans.factory.annotation.Autowired; |
11 | +import org.springframework.beans.factory.annotation.Value; | |
8 | 12 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
9 | 13 | import org.springframework.scheduling.annotation.Scheduled; |
10 | 14 | import org.springframework.stereotype.Component; |
11 | 15 | import org.springframework.util.CollectionUtils; |
12 | 16 | |
13 | 17 | import java.util.List; |
18 | +import java.util.concurrent.locks.Lock; | |
14 | 19 | |
15 | 20 | /** |
16 | 21 | * @author : kurisu |
... | ... | @@ -23,26 +28,43 @@ import java.util.List; |
23 | 28 | public class OriginDataDealTask { |
24 | 29 | private final OriginalDataService originalDataService; |
25 | 30 | private final FollowBizService followBizService; |
31 | + private final DistributedLocker distributedLocker; | |
32 | + | |
33 | + @Value("${spring.cache.custom.global-prefix}:origin") | |
34 | + @Getter | |
35 | + private String prefix; | |
26 | 36 | |
27 | 37 | @Autowired |
28 | 38 | public OriginDataDealTask(final OriginalDataService originalDataService, |
29 | - final FollowBizService followBizService) { | |
39 | + final FollowBizService followBizService, | |
40 | + final DistributedLocker distributedLocker) { | |
30 | 41 | this.originalDataService = originalDataService; |
31 | 42 | this.followBizService = followBizService; |
43 | + this.distributedLocker = distributedLocker; | |
32 | 44 | } |
33 | 45 | |
34 | 46 | @Scheduled(initialDelay = 1000 * 15, fixedRate = 1000 * 15) |
35 | 47 | public void syn2Task() { |
36 | - List<OriginalData> list = originalDataService.list(Wrappers.<OriginalData>lambdaQuery() | |
37 | - .eq(OriginalData::getSolved, Boolean.FALSE) | |
38 | - .last("limit 0,500") | |
39 | - ); | |
40 | - if (CollectionUtils.isEmpty(list)) { | |
48 | + String key = getPrefix() + ":syncdata"; | |
49 | + Lock lock = distributedLocker.lock(key); | |
50 | + boolean locked = ((RLock) lock).isLocked(); | |
51 | + if (!locked) { | |
41 | 52 | return; |
42 | 53 | } |
43 | - for (OriginalData data : list) { | |
44 | - data.setSolved(followBizService.origin2task(data)); | |
54 | + try { | |
55 | + List<OriginalData> list = originalDataService.list(Wrappers.<OriginalData>lambdaQuery() | |
56 | + .eq(OriginalData::getSolved, Boolean.FALSE) | |
57 | + .last("limit 0,500") | |
58 | + ); | |
59 | + if (CollectionUtils.isEmpty(list)) { | |
60 | + return; | |
61 | + } | |
62 | + for (OriginalData data : list) { | |
63 | + data.setSolved(followBizService.origin2task(data)); | |
64 | + } | |
65 | + originalDataService.updateBatchById(list); | |
66 | + } finally { | |
67 | + distributedLocker.unlock(lock); | |
45 | 68 | } |
46 | - originalDataService.updateBatchById(list); | |
47 | 69 | } |
48 | 70 | } | ... | ... |
fw-shirasawa-server/src/main/resources/application-local.yml