Commit 24ee74a58463cbd8e2b743f30466dbacdd85ca44
Merge remote-tracking branch 'origin/test'
Showing
3 changed files
with
15 additions
and
16 deletions
fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/ImAccountService.java
... | ... | @@ -21,7 +21,7 @@ public interface ImAccountService { |
21 | 21 | * @param accountCondition 注册用户对象 |
22 | 22 | * @return 注册后的账户对象 |
23 | 23 | */ |
24 | - @PostMapping("/api/account/register") | |
24 | + @PostMapping("/api/account/register") | |
25 | 25 | Message<AccountResult> userRegister(@RequestBody AccountCondition accountCondition); |
26 | 26 | |
27 | 27 | ... | ... |
fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/para/MessageBusinessType.java
fw-hermes-server/src/main/java/cn/fw/hermes/task/SendMessageTask.java
... | ... | @@ -10,6 +10,8 @@ import cn.fw.hermes.sdk.api.para.MessageResult; |
10 | 10 | import cn.fw.hermes.service.biz.MessageBizService; |
11 | 11 | import cn.fw.hermes.service.mq.MessageProducer; |
12 | 12 | import com.alibaba.fastjson.JSON; |
13 | + | |
14 | +import java.util.ArrayList; | |
13 | 15 | import java.util.concurrent.ArrayBlockingQueue; |
14 | 16 | import java.util.concurrent.ThreadPoolExecutor; |
15 | 17 | import java.util.concurrent.TimeUnit; |
... | ... | @@ -49,19 +51,21 @@ public class SendMessageTask { |
49 | 51 | @Async |
50 | 52 | public void sendAsynMsg() { |
51 | 53 | String messageBody; |
52 | - while (!StringUtils.isEmpty(messageBody = redisTemplate.opsForList() | |
53 | - .rightPop(RedisKey.ASYN_MESSAGE_TARGET.getCode()))) { | |
54 | + ArrayList<MsgSubject> failedMsgList = new ArrayList<>(); | |
55 | + while (!StringUtils.isEmpty(messageBody = redisTemplate.opsForList().rightPop(RedisKey.ASYN_MESSAGE_TARGET.getCode()))) { | |
54 | 56 | String finalMessageBody = messageBody; |
55 | - poolExecutor.execute(() -> this.sendMsg(finalMessageBody)); | |
57 | + poolExecutor.execute(() -> this.sendMsg(finalMessageBody,failedMsgList)); | |
56 | 58 | } |
59 | + failedMsgList.forEach(i -> redisTemplate.opsForList().leftPush(RedisKey.ASYN_MESSAGE_TARGET.getCode(), JSON.toJSONString(i))); | |
57 | 60 | } |
58 | 61 | |
59 | 62 | /** |
60 | 63 | * 发送异步消息任务 消息发送失败默认重试3次 |
61 | 64 | * |
62 | 65 | * @param messageBody JSON格式的消息对象 |
66 | + * @param failedMsgList 发送失败的消息 | |
63 | 67 | */ |
64 | - public void sendMsg(String messageBody) { | |
68 | + public void sendMsg(String messageBody, ArrayList<MsgSubject> failedMsgList) { | |
65 | 69 | final Integer retriesNum = 3; |
66 | 70 | String msg = "发送异步消息"; |
67 | 71 | MessageResult messageResult; |
... | ... | @@ -69,18 +73,14 @@ public class SendMessageTask { |
69 | 73 | String messageId = msgSubject.getCondition().getMessageId(); |
70 | 74 | try { |
71 | 75 | Boolean result = messageBizService.sendMsg(msgSubject); |
72 | - messageResult = new MessageResult(messageId, | |
73 | - result); | |
76 | + messageResult = new MessageResult(messageId, result); | |
74 | 77 | messageProducer.send(messageResult); |
75 | 78 | log.info("{} messageId[{}] 执行结果: {}", msg, messageId, result); |
76 | 79 | } catch (BusinessException bs) { |
77 | 80 | if (msgSubject.getRetriesNum() < retriesNum) { |
78 | 81 | msgSubject.setRetriesNum(msgSubject.getRetriesNum() + 1); |
79 | - redisTemplate.opsForList() | |
80 | - .leftPush(RedisKey.ASYN_MESSAGE_TARGET.getCode(), | |
81 | - JSON.toJSONString(msgSubject)); | |
82 | - log.error("{} 失败,进行第 {} 次重试 messageId[{}]", msg, msgSubject.getRetriesNum(), | |
83 | - messageId); | |
82 | + failedMsgList.add(msgSubject); | |
83 | + log.error("{} 失败,进行第 {} 次重试 messageId[{}]", msg, msgSubject.getRetriesNum(), messageId); | |
84 | 84 | } else { |
85 | 85 | SysMsg sysMsg = msgSubject.getMsgBodyDto().getSysMsg(); |
86 | 86 | sysMsg.setMessageId(messageId); |
... | ... | @@ -90,10 +90,7 @@ public class SendMessageTask { |
90 | 90 | log.error("{} 最终失败 messageId[{}]", msg, messageId); |
91 | 91 | messageResult = new MessageResult(messageId, false); |
92 | 92 | messageProducer.send(messageResult); |
93 | - | |
94 | - } | |
95 | - | |
96 | - } | |
93 | + } } | |
97 | 94 | } |
98 | 95 | |
99 | 96 | ... | ... |