diff --git a/fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/ImAccountService.java b/fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/ImAccountService.java index 8e069bb..cbe6cc9 100644 --- a/fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/ImAccountService.java +++ b/fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/ImAccountService.java @@ -21,7 +21,7 @@ public interface ImAccountService { * @param accountCondition 注册用户对象 * @return 注册后的账户对象 */ - @PostMapping("/api/account/register") + @PostMapping("/api/account/register") Message userRegister(@RequestBody AccountCondition accountCondition); diff --git a/fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/para/MessageBusinessType.java b/fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/para/MessageBusinessType.java index c837908..f0ccce9 100644 --- a/fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/para/MessageBusinessType.java +++ b/fw-hermes-sdk/src/main/java/cn/fw/hermes/sdk/api/para/MessageBusinessType.java @@ -5,7 +5,9 @@ import lombok.Getter; /** * @author Kurisu * 2019-05-07 11:31 + * 直接使用对应字符串(没有的需要注册) */ +@Deprecated public enum MessageBusinessType { /** * 系统提醒 diff --git a/fw-hermes-server/src/main/java/cn/fw/hermes/task/SendMessageTask.java b/fw-hermes-server/src/main/java/cn/fw/hermes/task/SendMessageTask.java index 548dc16..4a7b5e3 100644 --- a/fw-hermes-server/src/main/java/cn/fw/hermes/task/SendMessageTask.java +++ b/fw-hermes-server/src/main/java/cn/fw/hermes/task/SendMessageTask.java @@ -10,6 +10,8 @@ import cn.fw.hermes.sdk.api.para.MessageResult; import cn.fw.hermes.service.biz.MessageBizService; import cn.fw.hermes.service.mq.MessageProducer; import com.alibaba.fastjson.JSON; + +import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -49,19 +51,21 @@ public class SendMessageTask { @Async public void sendAsynMsg() { String messageBody; - while (!StringUtils.isEmpty(messageBody = redisTemplate.opsForList() - .rightPop(RedisKey.ASYN_MESSAGE_TARGET.getCode()))) { + ArrayList failedMsgList = new ArrayList<>(); + while (!StringUtils.isEmpty(messageBody = redisTemplate.opsForList().rightPop(RedisKey.ASYN_MESSAGE_TARGET.getCode()))) { String finalMessageBody = messageBody; - poolExecutor.execute(() -> this.sendMsg(finalMessageBody)); + poolExecutor.execute(() -> this.sendMsg(finalMessageBody,failedMsgList)); } + failedMsgList.forEach(i -> redisTemplate.opsForList().leftPush(RedisKey.ASYN_MESSAGE_TARGET.getCode(), JSON.toJSONString(i))); } /** * 发送异步消息任务 消息发送失败默认重试3次 * * @param messageBody JSON格式的消息对象 + * @param failedMsgList 发送失败的消息 */ - public void sendMsg(String messageBody) { + public void sendMsg(String messageBody, ArrayList failedMsgList) { final Integer retriesNum = 3; String msg = "发送异步消息"; MessageResult messageResult; @@ -69,18 +73,14 @@ public class SendMessageTask { String messageId = msgSubject.getCondition().getMessageId(); try { Boolean result = messageBizService.sendMsg(msgSubject); - messageResult = new MessageResult(messageId, - result); + messageResult = new MessageResult(messageId, result); messageProducer.send(messageResult); log.info("{} messageId[{}] 执行结果: {}", msg, messageId, result); } catch (BusinessException bs) { if (msgSubject.getRetriesNum() < retriesNum) { msgSubject.setRetriesNum(msgSubject.getRetriesNum() + 1); - redisTemplate.opsForList() - .leftPush(RedisKey.ASYN_MESSAGE_TARGET.getCode(), - JSON.toJSONString(msgSubject)); - log.error("{} 失败,进行第 {} 次重试 messageId[{}]", msg, msgSubject.getRetriesNum(), - messageId); + failedMsgList.add(msgSubject); + log.error("{} 失败,进行第 {} 次重试 messageId[{}]", msg, msgSubject.getRetriesNum(), messageId); } else { SysMsg sysMsg = msgSubject.getMsgBodyDto().getSysMsg(); sysMsg.setMessageId(messageId); @@ -90,10 +90,7 @@ public class SendMessageTask { log.error("{} 最终失败 messageId[{}]", msg, messageId); messageResult = new MessageResult(messageId, false); messageProducer.send(messageResult); - - } - - } + } } }