Commit 0bb9b9726b053b73a6ed519264068a6e4df6f7b7

Authored by chenyonghong
1 parent 4a6b7520

improve -> 定时任务 -> 发送异步消息

fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/ImAccountService.java
@@ -21,7 +21,7 @@ public interface ImAccountService { @@ -21,7 +21,7 @@ public interface ImAccountService {
21 * @param accountCondition 注册用户对象 21 * @param accountCondition 注册用户对象
22 * @return 注册后的账户对象 22 * @return 注册后的账户对象
23 */ 23 */
24 - @PostMapping("/api/account/register") 24 + @PostMapping("/api/account/register")
25 Message<AccountResult> userRegister(@RequestBody AccountCondition accountCondition); 25 Message<AccountResult> userRegister(@RequestBody AccountCondition accountCondition);
26 26
27 27
fw-hermes-server/src/main/java/cn/fw/hermes/task/SendMessageTask.java
@@ -10,6 +10,8 @@ import cn.fw.hermes.sdk.api.para.MessageResult; @@ -10,6 +10,8 @@ import cn.fw.hermes.sdk.api.para.MessageResult;
10 import cn.fw.hermes.service.biz.MessageBizService; 10 import cn.fw.hermes.service.biz.MessageBizService;
11 import cn.fw.hermes.service.mq.MessageProducer; 11 import cn.fw.hermes.service.mq.MessageProducer;
12 import com.alibaba.fastjson.JSON; 12 import com.alibaba.fastjson.JSON;
  13 +
  14 +import java.util.ArrayList;
13 import java.util.concurrent.ArrayBlockingQueue; 15 import java.util.concurrent.ArrayBlockingQueue;
14 import java.util.concurrent.ThreadPoolExecutor; 16 import java.util.concurrent.ThreadPoolExecutor;
15 import java.util.concurrent.TimeUnit; 17 import java.util.concurrent.TimeUnit;
@@ -49,19 +51,21 @@ public class SendMessageTask { @@ -49,19 +51,21 @@ public class SendMessageTask {
49 @Async 51 @Async
50 public void sendAsynMsg() { 52 public void sendAsynMsg() {
51 String messageBody; 53 String messageBody;
52 - while (!StringUtils.isEmpty(messageBody = redisTemplate.opsForList()  
53 - .rightPop(RedisKey.ASYN_MESSAGE_TARGET.getCode()))) { 54 + ArrayList<MsgSubject> failedMsgList = new ArrayList<>();
  55 + while (!StringUtils.isEmpty(messageBody = redisTemplate.opsForList().rightPop(RedisKey.ASYN_MESSAGE_TARGET.getCode()))) {
54 String finalMessageBody = messageBody; 56 String finalMessageBody = messageBody;
55 - poolExecutor.execute(() -> this.sendMsg(finalMessageBody)); 57 + poolExecutor.execute(() -> this.sendMsg(finalMessageBody,failedMsgList));
56 } 58 }
  59 + failedMsgList.forEach(i -> redisTemplate.opsForList().leftPush(RedisKey.ASYN_MESSAGE_TARGET.getCode(), JSON.toJSONString(i)));
57 } 60 }
58 61
59 /** 62 /**
60 * 发送异步消息任务 消息发送失败默认重试3次 63 * 发送异步消息任务 消息发送失败默认重试3次
61 * 64 *
62 * @param messageBody JSON格式的消息对象 65 * @param messageBody JSON格式的消息对象
  66 + * @param failedMsgList 发送失败的消息
63 */ 67 */
64 - public void sendMsg(String messageBody) { 68 + public void sendMsg(String messageBody, ArrayList<MsgSubject> failedMsgList) {
65 final Integer retriesNum = 3; 69 final Integer retriesNum = 3;
66 String msg = "发送异步消息"; 70 String msg = "发送异步消息";
67 MessageResult messageResult; 71 MessageResult messageResult;
@@ -69,18 +73,14 @@ public class SendMessageTask { @@ -69,18 +73,14 @@ public class SendMessageTask {
69 String messageId = msgSubject.getCondition().getMessageId(); 73 String messageId = msgSubject.getCondition().getMessageId();
70 try { 74 try {
71 Boolean result = messageBizService.sendMsg(msgSubject); 75 Boolean result = messageBizService.sendMsg(msgSubject);
72 - messageResult = new MessageResult(messageId,  
73 - result); 76 + messageResult = new MessageResult(messageId, result);
74 messageProducer.send(messageResult); 77 messageProducer.send(messageResult);
75 log.info("{} messageId[{}] 执行结果: {}", msg, messageId, result); 78 log.info("{} messageId[{}] 执行结果: {}", msg, messageId, result);
76 } catch (BusinessException bs) { 79 } catch (BusinessException bs) {
77 if (msgSubject.getRetriesNum() < retriesNum) { 80 if (msgSubject.getRetriesNum() < retriesNum) {
78 msgSubject.setRetriesNum(msgSubject.getRetriesNum() + 1); 81 msgSubject.setRetriesNum(msgSubject.getRetriesNum() + 1);
79 - redisTemplate.opsForList()  
80 - .leftPush(RedisKey.ASYN_MESSAGE_TARGET.getCode(),  
81 - JSON.toJSONString(msgSubject));  
82 - log.error("{} 失败,进行第 {} 次重试 messageId[{}]", msg, msgSubject.getRetriesNum(),  
83 - messageId); 82 + failedMsgList.add(msgSubject);
  83 + log.error("{} 失败,进行第 {} 次重试 messageId[{}]", msg, msgSubject.getRetriesNum(), messageId);
84 } else { 84 } else {
85 SysMsg sysMsg = msgSubject.getMsgBodyDto().getSysMsg(); 85 SysMsg sysMsg = msgSubject.getMsgBodyDto().getSysMsg();
86 sysMsg.setMessageId(messageId); 86 sysMsg.setMessageId(messageId);
@@ -90,10 +90,7 @@ public class SendMessageTask { @@ -90,10 +90,7 @@ public class SendMessageTask {
90 log.error("{} 最终失败 messageId[{}]", msg, messageId); 90 log.error("{} 最终失败 messageId[{}]", msg, messageId);
91 messageResult = new MessageResult(messageId, false); 91 messageResult = new MessageResult(messageId, false);
92 messageProducer.send(messageResult); 92 messageProducer.send(messageResult);
93 -  
94 - }  
95 -  
96 - } 93 + } }
97 } 94 }
98 95
99 96