Commit 04d4da7743a79993ce9d47e32e1a48cf0a196225
1 parent
4764c95d
feature(*): 新增历史数据清理定时任务
- 新增历史数据清理定时任务
Showing
5 changed files
with
116 additions
and
2 deletions
fw-hestia-common/pom.xml
fw-hestia-common/src/main/java/cn/fw/hestia/common/utils/ThreadPoolUtil.java
0 → 100644
1 | +package cn.fw.hestia.common.utils; | |
2 | + | |
3 | +import cn.hutool.core.thread.ExecutorBuilder; | |
4 | +import cn.hutool.core.thread.NamedThreadFactory; | |
5 | + | |
6 | +import java.util.concurrent.ExecutorService; | |
7 | +import java.util.concurrent.LinkedBlockingQueue; | |
8 | + | |
9 | +/** | |
10 | + * ThreadPoolUtil | |
11 | + * | |
12 | + * @author : kurisu | |
13 | + * @version : 2.0 | |
14 | + * @className : ThreadPoolUtil | |
15 | + * @description : ThreadPoolUtil | |
16 | + * @date : 2023-04-19 17:31 | |
17 | + */ | |
18 | +public class ThreadPoolUtil { | |
19 | + private static volatile ThreadPoolUtil INSTANCE; | |
20 | + private final ExecutorService executor; | |
21 | + | |
22 | + private ThreadPoolUtil() { | |
23 | + this.executor = ExecutorBuilder.create() | |
24 | + .setCorePoolSize(20) | |
25 | + .setMaxPoolSize(200) | |
26 | + .setThreadFactory(new NamedThreadFactory("hestia-custom-", false)) | |
27 | + .setAllowCoreThreadTimeOut(true) | |
28 | + .setWorkQueue(new LinkedBlockingQueue<>()) | |
29 | + .build(); | |
30 | + } | |
31 | + | |
32 | + public static ThreadPoolUtil getInstance() { | |
33 | + if (INSTANCE == null) { | |
34 | + synchronized (ThreadPoolUtil.class) { | |
35 | + if (INSTANCE == null) { | |
36 | + INSTANCE = new ThreadPoolUtil(); | |
37 | + } | |
38 | + } | |
39 | + } | |
40 | + return INSTANCE; | |
41 | + } | |
42 | + | |
43 | + public ExecutorService getExecutor() { | |
44 | + return executor; | |
45 | + } | |
46 | +} | ... | ... |
fw-hestia-server/src/main/java/cn/fw/hestia/server/task/SendMessageTask.java
1 | 1 | package cn.fw.hestia.server.task; |
2 | 2 | |
3 | +import cn.fw.hestia.common.utils.ThreadPoolUtil; | |
3 | 4 | import cn.fw.hestia.domain.db.MessageHistory; |
4 | 5 | import cn.fw.hestia.domain.enums.MessageStateEnum; |
5 | 6 | import cn.fw.hestia.service.buz.MessageCenterBizService; |
6 | 7 | import cn.fw.hestia.service.data.MessageHistoryService; |
7 | 8 | import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
9 | +import lombok.Getter; | |
8 | 10 | import org.springframework.beans.factory.annotation.Autowired; |
11 | +import org.springframework.beans.factory.annotation.Value; | |
9 | 12 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
13 | +import org.springframework.data.redis.core.BoundSetOperations; | |
14 | +import org.springframework.data.redis.core.StringRedisTemplate; | |
10 | 15 | import org.springframework.scheduling.annotation.Scheduled; |
11 | 16 | import org.springframework.stereotype.Component; |
12 | 17 | import org.springframework.util.CollectionUtils; |
13 | 18 | |
19 | +import java.time.LocalDateTime; | |
14 | 20 | import java.util.Date; |
15 | 21 | import java.util.List; |
16 | 22 | import java.util.concurrent.CompletableFuture; |
... | ... | @@ -28,12 +34,19 @@ import static cn.fw.hestia.common.constant.MessageConstant.MAX_FREQUENCY; |
28 | 34 | public class SendMessageTask { |
29 | 35 | private final MessageCenterBizService messageCenterBizService; |
30 | 36 | private final MessageHistoryService messageHistoryService; |
37 | + private final StringRedisTemplate stringRedisTemplate; | |
38 | + | |
39 | + @Getter | |
40 | + @Value("${spring.cache.custom.global-prefix}:data:clear") | |
41 | + private String dataClearKey; | |
31 | 42 | |
32 | 43 | @Autowired |
33 | 44 | public SendMessageTask(final MessageCenterBizService messageCenterBizService, |
34 | - final MessageHistoryService messageHistoryService) { | |
45 | + final MessageHistoryService messageHistoryService, | |
46 | + final StringRedisTemplate stringRedisTemplate) { | |
35 | 47 | this.messageCenterBizService = messageCenterBizService; |
36 | 48 | this.messageHistoryService = messageHistoryService; |
49 | + this.stringRedisTemplate = stringRedisTemplate; | |
37 | 50 | } |
38 | 51 | |
39 | 52 | /** |
... | ... | @@ -55,4 +68,35 @@ public class SendMessageTask { |
55 | 68 | CompletableFuture.runAsync(() -> messageCenterBizService.sendMessage(history)); |
56 | 69 | } |
57 | 70 | } |
71 | + | |
72 | + /** | |
73 | + * 缓存数据 | |
74 | + */ | |
75 | + @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10) | |
76 | + public void cacheOldData() { | |
77 | + String key = getDataClearKey(); | |
78 | + final BoundSetOperations<String, String> setOps = stringRedisTemplate.boundSetOps(key); | |
79 | + List<MessageHistory> list = messageHistoryService.list(Wrappers.<MessageHistory>lambdaQuery() | |
80 | + .lt(MessageHistory::getCreateTime, LocalDateTime.now().minusMonths(6L)) | |
81 | + .last("limit 500") | |
82 | + ); | |
83 | + if (CollectionUtils.isEmpty(list)) { | |
84 | + return; | |
85 | + } | |
86 | + list.stream().map(MessageHistory::getId).forEach(id -> setOps.add(id.toString())); | |
87 | + } | |
88 | + | |
89 | + /** | |
90 | + * 清理数据 | |
91 | + */ | |
92 | + @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 15) | |
93 | + public void clearOldData() { | |
94 | + String key = getDataClearKey(); | |
95 | + final BoundSetOperations<String, String> setOps = stringRedisTemplate.boundSetOps(key); | |
96 | + String idStr; | |
97 | + while ((idStr = setOps.pop()) != null) { | |
98 | + final Long id = Long.valueOf(idStr); | |
99 | + CompletableFuture.runAsync(() -> messageCenterBizService.clearHistory(id), ThreadPoolUtil.getInstance().getExecutor()); | |
100 | + } | |
101 | + } | |
58 | 102 | } | ... | ... |
fw-hestia-service/src/main/java/cn/fw/hestia/service/buz/MessageCenterBizService.java
... | ... | @@ -131,6 +131,20 @@ public class MessageCenterBizService { |
131 | 131 | messageHistoryService.updateById(history); |
132 | 132 | } |
133 | 133 | |
134 | + | |
135 | + /** | |
136 | + * 清理不使用的数据 | |
137 | + * | |
138 | + * @param id | |
139 | + */ | |
140 | + @Transactional(rollbackFor = Exception.class) | |
141 | + public void clearHistory(Long id) { | |
142 | + sendLogService.remove(Wrappers.<SendLog>lambdaQuery() | |
143 | + .eq(SendLog::getMessageId, id) | |
144 | + ); | |
145 | + messageHistoryService.removeById(id); | |
146 | + } | |
147 | + | |
134 | 148 | /** |
135 | 149 | * 撤回消息 |
136 | 150 | * | ... | ... |
pom.xml
... | ... | @@ -12,7 +12,7 @@ |
12 | 12 | <parent> |
13 | 13 | <groupId>cn.fw</groupId> |
14 | 14 | <artifactId>fw-common-dependencies</artifactId> |
15 | - <version>3.3.2</version> | |
15 | + <version>3.3.3</version> | |
16 | 16 | </parent> |
17 | 17 | |
18 | 18 | <modules> |
... | ... | @@ -38,6 +38,7 @@ |
38 | 38 | <fastjson>1.2.51</fastjson> |
39 | 39 | <fw-passport-sdk.version>2.2.0</fw-passport-sdk.version> |
40 | 40 | <yitter.idgenerator>1.0.6</yitter.idgenerator> |
41 | + <hutool.version>5.2.5</hutool.version> | |
41 | 42 | </properties> |
42 | 43 | |
43 | 44 | <dependencyManagement> |
... | ... | @@ -121,6 +122,11 @@ |
121 | 122 | <artifactId>yitter-idgenerator</artifactId> |
122 | 123 | <version>${yitter.idgenerator}</version> |
123 | 124 | </dependency> |
125 | + <dependency> | |
126 | + <groupId>cn.hutool</groupId> | |
127 | + <artifactId>hutool-all</artifactId> | |
128 | + <version>${hutool.version}</version> | |
129 | + </dependency> | |
124 | 130 | </dependencies> |
125 | 131 | </dependencyManagement> |
126 | 132 | ... | ... |