SendMessageTask.java
4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package cn.fw.hermes.task;
import cn.fw.common.exception.BusinessException;
import cn.fw.hermes.common.constant.ActionStatusEnum;
import cn.fw.hermes.common.constant.RedisKey;
import cn.fw.hermes.common.utils.StringUtils;
import cn.fw.hermes.domain.db.SysMsg;
import cn.fw.hermes.domain.dto.MsgSubject;
import cn.fw.hermes.sdk.api.para.MessageResult;
import cn.fw.hermes.service.biz.MessageBizService;
import cn.fw.hermes.service.mq.MessageProducer;
import com.alibaba.fastjson.JSON;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @Author: Chenery
* @Date: 2021/2/6 10:52
*/
@RequiredArgsConstructor
@Component
@Slf4j
@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
public class SendMessageTask {
private final MessageBizService messageBizService;
private final StringRedisTemplate redisTemplate;
private final MessageProducer messageProducer;
private ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
16,
32,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4096));
/**
* 定时从redis中拿取消息并发送
*/
@Scheduled(initialDelay = 1000 * 10, fixedDelay = 1000 * 5)
@Async
public void sendAsynMsg() {
String messageBody;
ArrayList<MsgSubject> failedMsgList = new ArrayList<>();
while (!StringUtils.isEmpty(messageBody = redisTemplate.opsForList().rightPop(RedisKey.ASYN_MESSAGE_TARGET.getCode()))) {
String finalMessageBody = messageBody;
poolExecutor.execute(() -> this.sendMsg(finalMessageBody,failedMsgList));
}
failedMsgList.forEach(i -> redisTemplate.opsForList().leftPush(RedisKey.ASYN_MESSAGE_TARGET.getCode(), JSON.toJSONString(i)));
}
/**
* 发送异步消息任务 消息发送失败默认重试3次
*
* @param messageBody JSON格式的消息对象
* @param failedMsgList 发送失败的消息
*/
public void sendMsg(String messageBody, ArrayList<MsgSubject> failedMsgList) {
final Integer retriesNum = 3;
String msg = "发送异步消息";
MessageResult messageResult;
MsgSubject msgSubject = JSON.parseObject(messageBody, MsgSubject.class);
String messageId = msgSubject.getCondition().getMessageId();
try {
Boolean result = messageBizService.sendMsg(msgSubject);
messageResult = new MessageResult(messageId, result);
messageProducer.send(messageResult);
log.info("{} messageId[{}] 执行结果: {}", msg, messageId, result);
} catch (BusinessException bs) {
if (msgSubject.getRetriesNum() < retriesNum) {
msgSubject.setRetriesNum(msgSubject.getRetriesNum() + 1);
failedMsgList.add(msgSubject);
log.error("{} 失败,进行第 {} 次重试 messageId[{}]", msg, msgSubject.getRetriesNum(), messageId);
} else {
SysMsg sysMsg = msgSubject.getMsgBodyDto().getSysMsg();
sysMsg.setMessageId(messageId);
sysMsg.setMsgStatus(bs.getCode());
sysMsg.setMsgStatusDesc(ActionStatusEnum.FAIL.getCode());
messageBizService.saveMsg(sysMsg);
log.error("{} 最终失败 messageId[{}]", msg, messageId);
messageResult = new MessageResult(messageId, false);
messageProducer.send(messageResult);
} }
}
/**
* 定时将保存到数据库失败的消息重试保存 保存失败默认重试3次
*/
@Scheduled(initialDelay = 1000 * 20, fixedRate = 1000 * 30)
@Async
public void saveMessageToDataBase() {
final String msg = "消息保存到数据库";
String sysMsgStr = redisTemplate.opsForList()
.rightPop(RedisKey.SAVE_MESSAGE_FAILED.getCode());
if (!StringUtils.isEmpty(sysMsgStr)) {
SysMsg sysMsg = JSON.parseObject(sysMsgStr, SysMsg.class);
if (sysMsg.getRetriesNum() < 3) {
log.info("{} 失败 进行第 {} 次重试,messageId[{}]", msg, sysMsg.getRetriesNum() + 1,
sysMsg.getMessageId());
sysMsg.setRetriesNum(sysMsg.getRetriesNum() + 1);
messageBizService.saveMsg(sysMsg);
} else {
log.error("{} 最终失败 messageId[{}]", msg, sysMsg.getMessageId());
}
}
}
}