Commit 14e9fc1bbed632509b5b6ac87f988da4ba83bd50

Authored by Kurisu
1 parent fa78338a

feat(*): 加入协程

- 加入协程
fw-hestia-common/src/main/java/cn/fw/hestia/common/constant/Constant.kt
1   -package cn.fw.hestia.common.constant;
  1 +package cn.fw.hestia.common.constant
2 2  
3 3 /**
4 4 * 常量
... ... @@ -8,7 +8,10 @@ package cn.fw.hestia.common.constant;
8 8 * @desc : 常量
9 9 * @date : 2023-08-10 10:51
10 10 */
11   -public interface Constant {
12   - String MEMBER_INFO_CACHE = "member:info:id";
13   - String MEMBER_INFO_MOBILE_CACHE = "member:info:mobile";
14   -}
  11 +object Constant {
  12 + const val MEMBER_INFO_CACHE: String = "member:info:id"
  13 + const val MEMBER_INFO_MOBILE_CACHE: String = "member:info:mobile"
  14 +
  15 + const val SUCCEED_STR: String = "succeed"
  16 + const val MAX_FREQUENCY: Int = 5
  17 +}
15 18 \ No newline at end of file
... ...
fw-hestia-common/src/main/java/cn/fw/hestia/common/constant/MessageConstant.java deleted
1   -package cn.fw.hestia.common.constant;
2   -
3   -/**
4   - * @author : kurisu
5   - * @className : MessageConstant
6   - * @description : 常量
7   - * @date: 2021-09-25
8   - */
9   -public interface MessageConstant {
10   - String SUCCEED_STR = "succeed";
11   - int MAX_FREQUENCY = 5;
12   -}
fw-hestia-rpc/src/main/java/cn/fw/hestia/rpc/AbsBaseRpcService.java deleted
1   -package cn.fw.hestia.rpc;
2   -
3   -import cn.fw.hestia.common.utils.StringUtils;
4   -import com.alibaba.fastjson.JSON;
5   -import com.alibaba.fastjson.JSONObject;
6   -import lombok.extern.slf4j.Slf4j;
7   -import org.springframework.beans.factory.annotation.Autowired;
8   -import org.springframework.data.redis.core.BoundListOperations;
9   -import org.springframework.data.redis.core.BoundValueOperations;
10   -import org.springframework.data.redis.core.StringRedisTemplate;
11   -import org.springframework.lang.NonNull;
12   -import org.springframework.lang.Nullable;
13   -
14   -import java.util.ArrayList;
15   -import java.util.List;
16   -import java.util.Objects;
17   -import java.util.Optional;
18   -import java.util.concurrent.TimeUnit;
19   -
20   -/**
21   - * @author : kurisu
22   - * @className : AbsBaseRpcService
23   - * @description : 公共方法
24   - * @date: 2020-12-17 14:13
25   - */
26   -@Slf4j
27   -public abstract class AbsBaseRpcService {
28   - /**
29   - * Redis工具
30   - */
31   - @Autowired
32   - protected StringRedisTemplate redisTemplate;
33   -
34   - /**
35   - * 缓存KEY前缀
36   - *
37   - * @return
38   - */
39   - protected abstract String getKeyPrefix();
40   -
41   - /**
42   - * 从缓存获取对象
43   - *
44   - * @param key
45   - * @param clazz
46   - * @param <E> 获取的对象
47   - * @return
48   - */
49   - @Nullable
50   - protected <E> E getFromCache(@NonNull final String key, Class<E> clazz) {
51   - String cache = getFromCache(key);
52   - if (StringUtils.INSTANCE.isEmpty(cache)) {
53   - return null;
54   - }
55   - return JSON.parseObject(cache, clazz);
56   - }
57   -
58   - protected String getFromCache(@NonNull final String key) {
59   - String json = null;
60   - try {
61   - BoundValueOperations<String, String> ops = redisTemplate.boundValueOps(key);
62   - json = ops.get();
63   - } catch (Exception e) {
64   - log.error("从缓存获信息失败[{}]", key, e);
65   - }
66   - return json;
67   - }
68   -
69   - protected void setToCache(@NonNull final String key, @NonNull final String value) {
70   - setToCache(key, value, 30);
71   - }
72   -
73   - /**
74   - * 缓存信息
75   - * @param key
76   - * @param value
77   - * @param timeout 缓存时间(秒)
78   - */
79   - protected void setToCache(@NonNull final String key, @NonNull final String value, long timeout) {
80   - try {
81   - redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS);
82   - } catch (Exception e) {
83   - log.error("缓存信息失败[{}][{}]", key, value, e);
84   - }
85   - }
86   -
87   - protected <E> List<E> getListFromCache(final String key, Class<E> clazz) {
88   - try {
89   - BoundListOperations<String, String> queue = getQueue(key);
90   - final long size = Optional.ofNullable(queue.size()).orElse(0L);
91   - if (size > 0) {
92   - final List<E> dtos = new ArrayList<>();
93   - for (long i = 0; i < size; i++) {
94   - final String json = Objects.requireNonNull(queue.index(i));
95   - dtos.add(JSONObject.parseObject(json, clazz));
96   - }
97   - return dtos;
98   - }
99   - } catch (Exception e) {
100   - log.error("从缓存获取信息失败[{}]", key, e);
101   - }
102   - return null;
103   - }
104   -
105   - protected void setListToCache(@NonNull final String key, @NonNull final String value) {
106   - try {
107   - getQueue(key).rightPush(value);
108   - } catch (Exception e) {
109   - log.error("缓存息失败[{}][{}]", key, value, e);
110   - }
111   - }
112   -
113   - protected BoundListOperations<String, String> getQueue(@NonNull final String key) {
114   - return redisTemplate.boundListOps(key);
115   - }
116   -}
fw-hestia-rpc/src/main/java/cn/fw/hestia/rpc/passport/TemplateMessageService.kt
1   -package cn.fw.hestia.rpc.passport;
  1 +package cn.fw.hestia.rpc.passport
2 2  
3   -import cn.fw.data.base.domain.common.Message;
4   -import cn.fw.hestia.common.constant.MessageConstant;
5   -import cn.fw.hestia.common.utils.StringUtils;
6   -import cn.fw.hestia.rpc.passport.dto.TMParam;
7   -import cn.fw.passport.sdk.api.WxMpTemplateMessageApi;
8   -import cn.fw.passport.sdk.api.param.WxMpTempMessageData;
9   -import cn.fw.passport.sdk.api.param.WxMpTempMessageParam;
10   -import com.alibaba.fastjson.JSONArray;
11   -import lombok.extern.slf4j.Slf4j;
12   -import org.springframework.stereotype.Service;
13   -
14   -import java.util.List;
  3 +import cn.fw.hestia.common.constant.Constant
  4 +import cn.fw.hestia.common.utils.StringUtils.isValid
  5 +import cn.fw.hestia.rpc.passport.dto.TMParam
  6 +import cn.fw.passport.sdk.api.WxMpTemplateMessageApi
  7 +import cn.fw.passport.sdk.api.param.WxMpTempMessageData
  8 +import cn.fw.passport.sdk.api.param.WxMpTempMessageParam
  9 +import com.alibaba.fastjson.JSONArray
  10 +import org.slf4j.LoggerFactory
  11 +import org.springframework.stereotype.Service
15 12  
16 13 /**
17 14 * passport系统-模板消息服务
18   - * <p>
  15 + *
  16 + *
19 17 * create at 2019-05-15
20 18 *
21 19 * @author kurisu
22 20 */
23   -@Slf4j
24 21 @Service
25   -public class TemplateMessageService {
26   -
27   - private final WxMpTemplateMessageApi wxMpTemplateMessageApi;
28   -
29   - public TemplateMessageService(final WxMpTemplateMessageApi wxMpTemplateMessageApi) {
30   - this.wxMpTemplateMessageApi = wxMpTemplateMessageApi;
31   - }
  22 +class TemplateMessageService(private val wxMpTemplateMessageApi: WxMpTemplateMessageApi) {
  23 + private val log = LoggerFactory.getLogger(this::class.java)
32 24  
33 25 /**
34 26 * 发送消息通知
35 27 *
36 28 * @param messageParam
37 29 */
38   - public String sendTemplateMessage(TMParam messageParam) {
39   - log.info("发送通知,SceneToken :{}", messageParam.getSceneToken());
  30 + fun sendTemplateMessage(messageParam: TMParam): String {
  31 + log.info("发送通知,SceneToken :{}", messageParam.sceneToken)
40 32 try {
41   - WxMpTempMessageParam param = new WxMpTempMessageParam();
42   - param.setCusId(messageParam.getMemberId());
43   - param.setTempId(messageParam.getTemplateCode());
44   - if (StringUtils.INSTANCE.isValid(messageParam.getKeywords())) {
45   - List<WxMpTempMessageData> keywords = JSONArray.parseArray(messageParam.getKeywords(), WxMpTempMessageData.class);
46   - param.setKeyWordList(keywords);
  33 + val param = WxMpTempMessageParam()
  34 + param.cusId = messageParam.memberId
  35 + param.tempId = messageParam.templateCode
  36 + if (isValid(messageParam.keywords)) {
  37 + val keywords = JSONArray.parseArray(
  38 + messageParam.keywords,
  39 + WxMpTempMessageData::class.java
  40 + )
  41 + param.keyWordList = keywords
47 42 }
48   - String pagePath = getPagePath(messageParam.getPath(), messageParam.getSceneToken());
49   - if (StringUtils.INSTANCE.isValid(pagePath)) {
50   - param.setPagePath(pagePath);
  43 + val pagePath = getPagePath(messageParam.path, messageParam.sceneToken)
  44 + if (isValid(pagePath)) {
  45 + param.pagePath = pagePath
51 46 }
52   - Message<?> msg = wxMpTemplateMessageApi.send(param);
53   - if (!msg.isSuccess()) {
54   - log.error("【passport系统】发送模板消息失败:{}", msg.getResult());
55   - return msg.getResult();
  47 + val msg = wxMpTemplateMessageApi.send(param)
  48 + if (!msg.isSuccess) {
  49 + log.error("【passport系统】发送模板消息失败:{}", msg.result)
  50 + return msg.result
56 51 }
57   - return MessageConstant.SUCCEED_STR;
58   - } catch (Exception e) {
59   - log.error("发送模板消息失败", e);
60   - return "系统调用异常";
  52 + return Constant.SUCCEED_STR
  53 + } catch (e: Exception) {
  54 + log.error("发送模板消息失败", e)
  55 + return "系统调用异常"
61 56 }
62 57 }
63 58  
64 59  
65   - private String getPagePath(String path, Long sceneToken) {
66   - if (StringUtils.INSTANCE.isEmpty(path)) {
67   - return null;
68   - }
69   - StringBuilder sb = new StringBuilder(path);
70   - if (StringUtils.INSTANCE.isValid(path)) {
71   - sb.append("?sceneToken=").append(sceneToken);
  60 + private fun getPagePath(path: String?, sceneToken: Long?): String? {
  61 + return if (!path.isNullOrEmpty()) {
  62 + val sb = StringBuilder(path)
  63 + if (isValid(sceneToken)) {
  64 + sb.append("?sceneToken=").append(sceneToken)
  65 + }
  66 + sb.toString()
  67 + } else {
  68 + null
72 69 }
73   - return sb.toString();
74 70 }
75 71 -}
  72 +}
76 73 \ No newline at end of file
... ...
fw-hestia-rpc/src/main/java/cn/fw/hestia/rpc/passport/dto/TMParam.kt
1   -package cn.fw.hestia.rpc.passport.dto;
2   -
3   -import lombok.Data;
4   -import lombok.ToString;
5   -
6   -import java.util.Map;
  1 +package cn.fw.hestia.rpc.passport.dto
7 2  
8 3 /**
9   - * 续保提醒模板消息param
10   - *
11   - * 模板消息样例:
12 4 *
13   - * {{first.DATA}}
14   - * 变更类型:{{keyword1.DATA}}
15   - * 变更结果:{{keyword2.DATA}}
16   - * {{remark.DATA}}
17   - *
18   - * @author kurisu
  5 + * @className : TMParam
  6 + * @description :
  7 + * @author : kurisu
  8 + * @date : 2023-12-21 22:35
  9 + * @version :
19 10 */
20   -@Data
21   -@ToString
22   -public class TMParam {
  11 +
  12 +data class TMParam(
23 13 /**
24 14 * 会员id
25 15 */
26   - private Long memberId;
  16 + val memberId: Long?,
27 17 /**
28 18 * 标题
29 19 */
30   - private String title;
  20 + val title: String?,
31 21  
32   - private String keywords;
  22 + val keywords: String?,
33 23 /**
34 24 * 备注
35 25 */
36   - private String remark;
  26 + val remark: String?,
37 27  
38 28 /**
39 29 * 如需跳转小程序,则是小程序页面路径
40 30 */
41   - private String path;
  31 + val path: String?,
42 32  
43 33  
44   - private Long sceneToken;
  34 + val sceneToken: Long?,
45 35 /**
46 36 * 模板code
47 37 * 新版对应模板id
48 38 */
49   - private String templateCode;
50   -}
  39 + val templateCode: String?,
  40 +)
... ...
fw-hestia-server/src/main/java/cn/fw/hestia/server/task/SendMessageTask.kt
1 1 package cn.fw.hestia.server.task
2 2  
3   -import cn.fw.hestia.common.constant.MessageConstant
4   -import cn.fw.hestia.common.utils.ThreadPoolUtil
  3 +import cn.fw.hestia.common.constant.Constant
5 4 import cn.fw.hestia.domain.db.MessageHistory
6 5 import cn.fw.hestia.domain.enums.MessageStateEnum
7 6 import cn.fw.hestia.service.buz.MessageCenterBizService
8 7 import cn.fw.hestia.service.data.MessageHistoryService
9 8 import com.baomidou.mybatisplus.extension.kotlin.KtQueryWrapper
  9 +import kotlinx.coroutines.CoroutineScope
  10 +import kotlinx.coroutines.Dispatchers
  11 +import kotlinx.coroutines.launch
10 12 import org.springframework.beans.factory.annotation.Value
11 13 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
12 14 import org.springframework.data.redis.core.StringRedisTemplate
... ... @@ -15,7 +17,6 @@ import org.springframework.stereotype.Component
15 17 import org.springframework.util.CollectionUtils
16 18 import java.time.LocalDateTime
17 19 import java.util.*
18   -import java.util.concurrent.CompletableFuture
19 20  
20 21 /**
21 22 * @author : kurisu
... ... @@ -35,21 +36,23 @@ class SendMessageTask(
35 36 /**
36 37 * 发送模板消息
37 38 */
38   - @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 5)
  39 + @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 3)
39 40 fun sendNotice() {
40 41 val list = messageHistoryService.list(
41 42 KtQueryWrapper(MessageHistory::class.java)
42 43 .eq(MessageHistory::state, MessageStateEnum.MADA)
43 44 .eq(MessageHistory::yn, true)
44   - .lt(MessageHistory::frequency, MessageConstant.MAX_FREQUENCY)
  45 + .lt(MessageHistory::frequency, Constant.MAX_FREQUENCY)
45 46 .lt(MessageHistory::sendTime, Date())
46   - .last("limit 1000")
  47 + .last("limit 500")
47 48 )
48   - if (CollectionUtils.isEmpty(list)) {
  49 + if (list.isNullOrEmpty()) {
49 50 return
50 51 }
51   - for (history in list) {
52   - CompletableFuture.runAsync { messageCenterBizService.sendMessage(history) }
  52 + list.forEach {
  53 + CoroutineScope(Dispatchers.IO).launch {
  54 + messageCenterBizService.sendMessage(it)
  55 + }
53 56 }
54 57 }
55 58  
... ... @@ -63,30 +66,36 @@ class SendMessageTask(
63 66 val list = messageHistoryService.list(
64 67 KtQueryWrapper(MessageHistory::class.java)
65 68 .lt(MessageHistory::createTime, LocalDateTime.now().minusMonths(6L))
66   - .last("limit 500")
  69 + .last("limit 1000")
67 70 )
68 71 if (CollectionUtils.isEmpty(list)) {
69 72 return
70 73 }
71   - list.stream().map(MessageHistory::id).forEach { id: Long? -> id?.let { setOps.add(it.toString()) } }
  74 + list.forEach {
  75 + it.id?.run {
  76 + setOps.add(this.toString())
  77 + }
  78 + }
72 79 }
73 80  
74 81 /**
75 82 * 清理数据
76 83 */
77   - @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 15)
  84 + @Scheduled(initialDelay = 1000 * 15, fixedRate = 1000 * 15)
78 85 fun clearOldData() {
79 86 val key: String = dataClearKey
80 87 val setOps = stringRedisTemplate.boundSetOps(key)
81   - var idStr: String?
82   - while ((setOps.pop().also { idStr = it }) != null) {
83   - val id = idStr?.toLong()
84   - id?.let {
85   - CompletableFuture.runAsync(
86   - { messageCenterBizService.clearHistory(it) },
87   - ThreadPoolUtil.executor
88   - )
  88 + var counter = 0
  89 + val removeIds = mutableSetOf<Long>()
  90 + while (counter < 2000) {
  91 + val idStr = setOps.pop() ?: break
  92 + idStr.toLong().run {
  93 + removeIds.add(this)
89 94 }
  95 + counter++
  96 + }
  97 + if (removeIds.isNotEmpty()) {
  98 + messageCenterBizService.clearHistoryBatch(removeIds)
90 99 }
91 100 }
92 101 }
93 102 \ No newline at end of file
... ...
fw-hestia-service/src/main/java/cn/fw/hestia/service/buz/MessageCenterBizService.kt
... ... @@ -3,7 +3,7 @@ package cn.fw.hestia.service.buz
3 3 import cn.fw.common.businessvalidator.Validator
4 4 import cn.fw.common.data.mybatis.pagination.PageData
5 5 import cn.fw.common.page.AppPage
6   -import cn.fw.hestia.common.constant.MessageConstant
  6 +import cn.fw.hestia.common.constant.Constant
7 7 import cn.fw.hestia.common.utils.DateUtil
8 8 import cn.fw.hestia.common.utils.StringUtils
9 9 import cn.fw.hestia.component.SendMsgProducer
... ... @@ -22,7 +22,6 @@ import cn.fw.hestia.service.data.SendLogService
22 22 import cn.fw.passport.sdk.api.param.WxMpTempMessageData
23 23 import com.alibaba.fastjson.JSON
24 24 import com.alibaba.fastjson.JSONArray
25   -import com.baomidou.mybatisplus.core.toolkit.Wrappers
26 25 import com.baomidou.mybatisplus.extension.kotlin.KtQueryWrapper
27 26 import org.springframework.beans.factory.annotation.Value
28 27 import org.springframework.cache.annotation.Cacheable
... ... @@ -110,7 +109,7 @@ class MessageCenterBizService(
110 109 setOps.add(seanceToken)
111 110 }
112 111 }
113   - val succeed = MessageConstant.SUCCEED_STR == result
  112 + val succeed = Constant.SUCCEED_STR == result
114 113 val frequency = history.frequency!! + 1
115 114 history.frequency = frequency
116 115 if (succeed) {
... ... @@ -133,15 +132,29 @@ class MessageCenterBizService(
133 132 * @param id
134 133 */
135 134 @Transactional(rollbackFor = [Exception::class])
136   - fun clearHistory(id: Long?) {
  135 + fun clearHistory(id: Long) {
137 136 sendLogService.remove(
138   - Wrappers.lambdaQuery<SendLog>()
  137 + KtQueryWrapper(SendLog::class.java)
139 138 .eq(SendLog::messageId, id)
140 139 )
141 140 messageHistoryService.removeById(id)
142 141 }
143 142  
144 143 /**
  144 + * 批量清理不使用的数据
  145 + *
  146 + * @param idList
  147 + */
  148 + @Transactional(rollbackFor = [Exception::class])
  149 + fun clearHistoryBatch(idList: Set<Long>) {
  150 + sendLogService.remove(
  151 + KtQueryWrapper(SendLog::class.java)
  152 + .`in`(SendLog::messageId, idList)
  153 + )
  154 + messageHistoryService.removeByIds(idList)
  155 + }
  156 +
  157 + /**
145 158 * 撤回消息
146 159 *
147 160 * @param sceneToken
... ... @@ -251,7 +264,7 @@ class MessageCenterBizService(
251 264 val result = saveLog(
252 265 createTmParam(messageHistory), messageHistory.id
253 266 ) { r: SendLog -> r.manual = true }
254   - val succeed = MessageConstant.SUCCEED_STR == result
  267 + val succeed = Constant.SUCCEED_STR == result
255 268 if (succeed) {
256 269 messageHistory.state = MessageStateEnum.SUMI
257 270 sendMsgProducer.send(MessageSendMq(messageHistory.id, messageHistory.memberId, Date()))
... ... @@ -265,7 +278,7 @@ class MessageCenterBizService(
265 278 get() = settingProperty.tempId ?: "无效code"
266 279  
267 280 private fun saveLog(tmParam: TMParam, messageId: Long?, cons: Consumer<SendLog>): String {
268   - val result = MessageConstant.SUCCEED_STR
  281 + val result = Constant.SUCCEED_STR
269 282 val sendLog = SendLog(messageId, Date(), succeed = true, manual = false, result)
270 283 sendLogService.save(sendLog)
271 284 return result
... ... @@ -317,14 +330,16 @@ class MessageCenterBizService(
317 330 }
318 331  
319 332 private fun createTmParam(history: MessageHistory): TMParam {
320   - val tmParam = TMParam()
321   - tmParam.setSceneToken(history.id)
322   - tmParam.setTitle(history.content)
323   - tmParam.setRemark(history.remark)
324   - tmParam.setPath(history.pagePath)
325   - tmParam.setTemplateCode(history.templateCode)
326   - tmParam.setMemberId(history.memberId)
327   - tmParam.setKeywords(history.keywords)
  333 + val (id, memberId, templateCode, title, _, keywords, remark, pagePath) = history
  334 + val tmParam = TMParam(
  335 + memberId,
  336 + title,
  337 + keywords,
  338 + remark,
  339 + path = pagePath,
  340 + sceneToken = id,
  341 + templateCode
  342 + )
328 343 return tmParam
329 344 }
330 345 }
331 346 \ No newline at end of file
... ...