Commit 7eb9a50b57ae10b3c018b4d7ecc9cab53f4b7c7e
1 parent
5e8c709c
feature(*): 添加消息推送限流
- 添加消息推送限流
Showing
1 changed file
with
8 additions
and
2 deletions
fw-hermes-server/src/main/java/cn/fw/hermes/task/MessageSendTask.java
... | ... | @@ -52,8 +52,8 @@ public class MessageSendTask { |
52 | 52 | @Scheduled(cron = "0/30 * * * * *") |
53 | 53 | public void ready2Send() { |
54 | 54 | List<MessageHistory> list = messageHistoryService.list(Wrappers.<MessageHistory>lambdaQuery() |
55 | - .eq(MessageHistory::getMsgStatus, MsgStatusEnum.WAITING) | |
56 | - .ge(MessageHistory::getCreateTime, LocalDateTime.now().minusDays(7L)) | |
55 | + .eq(MessageHistory::getMsgStatus, MsgStatusEnum.WAITING) | |
56 | + .ge(MessageHistory::getCreateTime, LocalDateTime.now().minusDays(7L)) | |
57 | 57 | ); |
58 | 58 | if (CollectionUtils.isEmpty(list)) { |
59 | 59 | return; |
... | ... | @@ -69,8 +69,11 @@ public class MessageSendTask { |
69 | 69 | @Scheduled(cron = "0/2 * * * * *") |
70 | 70 | public void sendMsg() { |
71 | 71 | List<String> failList = new ArrayList<>(); |
72 | + // 限流标志 [最多一次性推送100条] | |
73 | + int count = 100; | |
72 | 74 | String jsonStr; |
73 | 75 | while ((jsonStr = stringRedisTemplate.opsForSet().pop(Constant.READY_TO_SEND_MSG_CACHE_KEY)) != null) { |
76 | + count--; | |
74 | 77 | try { |
75 | 78 | final Long messageId = Long.valueOf(jsonStr); |
76 | 79 | messageSender.sendMsg(messageId); |
... | ... | @@ -80,6 +83,9 @@ public class MessageSendTask { |
80 | 83 | } |
81 | 84 | log.error("推送消息失败", e); |
82 | 85 | } |
86 | + if (count < 0) { | |
87 | + break; | |
88 | + } | |
83 | 89 | } |
84 | 90 | if (!CollectionUtils.isEmpty(failList)) { |
85 | 91 | stringRedisTemplate.opsForSet().add(Constant.READY_TO_SEND_MSG_CACHE_KEY, failList.toArray(new String[0])); | ... | ... |