SendMessageTask.java 4.72 KB
package cn.fw.hermes.task;

import cn.fw.common.exception.BusinessException;
import cn.fw.hermes.common.constant.ActionStatusEnum;
import cn.fw.hermes.common.constant.RedisKey;
import cn.fw.hermes.common.utils.StringUtils;
import cn.fw.hermes.domain.db.SysMsg;
import cn.fw.hermes.domain.dto.MsgSubject;
import cn.fw.hermes.sdk.api.para.MessageResult;
import cn.fw.hermes.service.biz.MessageBizService;
import cn.fw.hermes.service.mq.MessageProducer;
import com.alibaba.fastjson.JSON;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @Author: Chenery
 * @Date: 2021/2/6 10:52
 */
@RequiredArgsConstructor
@Component
@Slf4j
@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
public class SendMessageTask {

    private final MessageBizService messageBizService;
    private final StringRedisTemplate redisTemplate;
    private final MessageProducer messageProducer;
    private ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
            16,
            32,
            1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(4096));


    /**
     * 定时从redis中拿取消息并发送
     */
    @Scheduled(initialDelay = 1000 * 10, fixedDelay = 1000 * 5)
    @Async
    public void sendAsynMsg() {
        String messageBody;
        while (!StringUtils.isEmpty(messageBody = redisTemplate.opsForList()
                .rightPop(RedisKey.ASYN_MESSAGE_TARGET.getCode()))) {
            String finalMessageBody = messageBody;
            poolExecutor.execute(() -> this.sendMsg(finalMessageBody));
        }
    }

    /**
     * 发送异步消息任务 消息发送失败默认重试3次
     *
     * @param messageBody JSON格式的消息对象
     */
    public void sendMsg(String messageBody) {
        final Integer retriesNum = 3;
        String msg = "发送异步消息";
        MessageResult messageResult;
        MsgSubject msgSubject = JSON.parseObject(messageBody, MsgSubject.class);
        String messageId = msgSubject.getCondition().getMessageId();
        try {
            Boolean result = messageBizService.sendMsg(msgSubject);
            messageResult = new MessageResult(messageId,
                    result);
            messageProducer.send(messageResult);
            log.info("{}  messageId[{}] 执行结果: {}", msg, messageId, result);
        } catch (BusinessException bs) {
            if (msgSubject.getRetriesNum() < retriesNum) {
                msgSubject.setRetriesNum(msgSubject.getRetriesNum() + 1);
                redisTemplate.opsForList()
                        .leftPush(RedisKey.ASYN_MESSAGE_TARGET.getCode(),
                                JSON.toJSONString(msgSubject));
                log.error("{} 失败,进行第 {} 次重试 messageId[{}]", msg, msgSubject.getRetriesNum(),
                        messageId);
            } else {
                SysMsg sysMsg = msgSubject.getMsgBodyDto().getSysMsg();
                sysMsg.setMessageId(messageId);
                sysMsg.setMsgStatus(bs.getCode());
                sysMsg.setMsgStatusDesc(ActionStatusEnum.FAIL.getCode());
                messageBizService.saveMsg(sysMsg);
                log.error("{} 最终失败 messageId[{}]", msg, messageId);
                messageResult = new MessageResult(messageId, false);
                messageProducer.send(messageResult);

            }

        }
    }


    /**
     * 定时将保存到数据库失败的消息重试保存 保存失败默认重试3次
     */
    @Scheduled(initialDelay = 1000 * 20, fixedRate = 1000 * 30)
    @Async
    public void saveMessageToDataBase() {
        final String msg = "消息保存到数据库";
        String sysMsgStr = redisTemplate.opsForList()
                .rightPop(RedisKey.SAVE_MESSAGE_FAILED.getCode());
        if (!StringUtils.isEmpty(sysMsgStr)) {
            SysMsg sysMsg = JSON.parseObject(sysMsgStr, SysMsg.class);
            if (sysMsg.getRetriesNum() < 3) {
                log.info("{}  失败 进行第 {} 次重试,messageId[{}]", msg, sysMsg.getRetriesNum() + 1,
                        sysMsg.getMessageId());
                sysMsg.setRetriesNum(sysMsg.getRetriesNum() + 1);
                messageBizService.saveMsg(sysMsg);
            } else {
                log.error("{}  最终失败 messageId[{}]", msg, sysMsg.getMessageId());
            }
        }
    }


}