Commit d9d236c90190b39cad16d2d00dbee9455d8ff96b

Authored by 张志伟
1 parent 59397026

:fire: feature(*): 引入腾讯推送sdk

- 引入腾讯推送sdk
fw-hermes-rpc/pom.xml
... ... @@ -42,5 +42,9 @@
42 42 <groupId>cn.fw</groupId>
43 43 <artifactId>fw-common-cache</artifactId>
44 44 </dependency>
  45 + <dependency>
  46 + <groupId>org.jetbrains.kotlinx</groupId>
  47 + <artifactId>kotlinx-coroutines-core</artifactId>
  48 + </dependency>
45 49 </dependencies>
46   -</project>
47 50 \ No newline at end of file
  51 +</project>
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/HermesServer.kt
1   -package cn.fw.hermes;
  1 +package cn.fw.hermes
2 2  
3   -import cn.fw.security.auth.client.EnableAuthClient;
4   -import org.mybatis.spring.annotation.MapperScan;
5   -import org.springframework.boot.SpringApplication;
6   -import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
7   -import org.springframework.cache.annotation.EnableCaching;
8   -import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
9   -import org.springframework.cloud.openfeign.EnableFeignClients;
10   -import org.springframework.context.annotation.ComponentScan;
11   -import org.springframework.context.annotation.Configuration;
12   -import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
13   -import org.springframework.scheduling.annotation.EnableScheduling;
14   -import org.springframework.transaction.annotation.EnableTransactionManagement;
  3 +import cn.fw.security.auth.client.EnableAuthClient
  4 +import org.mybatis.spring.annotation.MapperScan
  5 +import org.springframework.boot.SpringApplication
  6 +import org.springframework.boot.autoconfigure.EnableAutoConfiguration
  7 +import org.springframework.cache.annotation.EnableCaching
  8 +import org.springframework.cloud.client.discovery.EnableDiscoveryClient
  9 +import org.springframework.cloud.openfeign.EnableFeignClients
  10 +import org.springframework.context.annotation.ComponentScan
  11 +import org.springframework.context.annotation.Configuration
  12 +import org.springframework.data.redis.repository.configuration.EnableRedisRepositories
  13 +import org.springframework.scheduling.annotation.EnableScheduling
  14 +import org.springframework.transaction.annotation.EnableTransactionManagement
15 15  
16 16 /**
17 17 * 启动类
... ... @@ -28,10 +28,10 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
28 28 @Configuration
29 29 @EnableRedisRepositories
30 30 @MapperScan("cn.fw.**.mapper")
31   -@ComponentScan({"cn.fw.hermes.*"})
32   -@EnableFeignClients({"cn.fw.**.sdk"})
33   -public class HermesServer {
34   - public static void main(String[] args) {
35   - SpringApplication.run(HermesServer.class, args);
36   - }
37   -}
38 31 \ No newline at end of file
  32 +@ComponentScan("cn.fw.hermes.*")
  33 +@EnableFeignClients("cn.fw.**.sdk")
  34 +class HermesServer
  35 +
  36 +fun main(args: Array<String>) {
  37 + SpringApplication.run(HermesServer::class.java, *args)
  38 +}
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/LogExtend.kt 0 → 100644
  1 +package cn.fw.hermes
  2 +
  3 +import org.slf4j.Logger
  4 +import org.slf4j.LoggerFactory
  5 +
  6 +/**
  7 + * 日志拓展
  8 + *
  9 + * @author : kurisu
  10 + * @version : 1.0
  11 + * @desc : 日志拓展
  12 + * @date : 2023-12-16 15:46
  13 + */
  14 +
  15 +fun <T : Any> T.logger(): Logger {
  16 + return LoggerFactory.getLogger(this.javaClass)
  17 +}
  18 +
  19 +fun <T : Any> T.logTrace(msg: String) {
  20 + logger().trace(msg)
  21 +}
  22 +
  23 +fun <T : Any> T.logTrace(msg: String, vararg var2: Any?) {
  24 + logger().trace(msg, var2)
  25 +}
  26 +
  27 +fun <T : Any> T.logDebug(msg: String) {
  28 + logger().debug(msg)
  29 +}
  30 +
  31 +fun <T : Any> T.logDebug(msg: String, vararg var2: Any?) {
  32 + logger().debug(msg, var2)
  33 +}
  34 +
  35 +fun <T : Any> T.logInfo(msg: String) {
  36 + logger().info(msg)
  37 +}
  38 +
  39 +fun <T : Any> T.logInfo(msg: String, vararg var2: Any?) {
  40 + logger().info(msg, var2)
  41 +}
  42 +
  43 +fun <T : Any> T.logWarn(msg: String) {
  44 + logger().warn(msg)
  45 +}
  46 +
  47 +fun <T : Any> T.logWarn(msg: String, vararg var2: Any?) {
  48 + logger().warn(msg, var2)
  49 +}
  50 +
  51 +fun <T : Any> T.logError(msg: String) {
  52 + logger().error(msg)
  53 +}
  54 +
  55 +fun <T : Any> T.logError(e: Throwable) {
  56 + logger().error("", e)
  57 +}
  58 +
  59 +fun <T : Any> T.logError(msg: String, vararg var2: Any?) {
  60 + logger().error(msg, var2)
  61 +}
  62 +
  63 +fun <T : Any> T.logError(msg: String, e: Throwable) {
  64 + logger().error(msg, e)
  65 +}
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/config/AutoConfiguration.kt
1   -package cn.fw.hermes.config;
  1 +package cn.fw.hermes.config
2 2  
3   -import cn.fw.hermes.service.property.MqttProperty;
4   -import cn.fw.hermes.service.property.SettingProperty;
5   -import cn.hutool.core.util.IdUtil;
6   -import lombok.extern.slf4j.Slf4j;
7   -import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
8   -import org.eclipse.paho.client.mqttv3.MqttException;
9   -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
10   -import org.springframework.beans.factory.annotation.Autowired;
11   -import org.springframework.boot.context.properties.EnableConfigurationProperties;
12   -import org.springframework.context.annotation.Bean;
13   -import org.springframework.context.annotation.Configuration;
  3 +import cn.fw.hermes.logInfo
  4 +import cn.fw.hermes.service.property.MqttProperty
  5 +import cn.fw.hermes.service.property.SettingProperty
  6 +import cn.hutool.core.util.IdUtil
  7 +import org.eclipse.paho.client.mqttv3.MqttAsyncClient
  8 +import org.eclipse.paho.client.mqttv3.MqttException
  9 +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
  10 +import org.springframework.boot.context.properties.EnableConfigurationProperties
  11 +import org.springframework.context.annotation.Bean
  12 +import org.springframework.context.annotation.Configuration
14 13  
15 14 /**
16 15 * mqtt配置
17 16 *
18 17 * @author kurisu
19 18 */
20   -@Slf4j
21 19 @Configuration
22   -@EnableConfigurationProperties({MqttProperty.class, SettingProperty.class})
23   -public class AutoConfiguration {
24   -
25   - private final MqttProperty mqttProperty;
26   -
27   - @Autowired
28   - public AutoConfiguration(final MqttProperty mqttProperty) {
29   - this.mqttProperty = mqttProperty;
30   - }
31   -
32   - @Bean
33   - public MqttAsyncClient getMqttClient() throws MqttException {
34   - String host = mqttProperty.getHost();
35   - int port = mqttProperty.getPort();
36   - String nextId = IdUtil.simpleUUID();
37   - String clientId = String.format("%s-%s", mqttProperty.getClientId(), nextId);
38   - String url = String.format("%s:%d", host, port);
39   - log.info("mqtt client initialize : {}", url);
40   - return new MqttAsyncClient(url, clientId, new MemoryPersistence());
41   - }
  20 +@EnableConfigurationProperties(MqttProperty::class, SettingProperty::class)
  21 +class AutoConfiguration(private val mqttProperty: MqttProperty) {
  22 + @get:Throws(MqttException::class)
  23 + @get:Bean
  24 + val mqttClient: MqttAsyncClient
  25 + get() {
  26 + val host = mqttProperty.host
  27 + val port = mqttProperty.port
  28 + val nextId = IdUtil.simpleUUID()
  29 + val clientId = String.format("%s-%s", mqttProperty.clientId, nextId)
  30 + val url = String.format("%s:%d", host, port)
  31 + logInfo("mqtt client initialize : {}", url)
  32 + return MqttAsyncClient(url, clientId, MemoryPersistence())
  33 + }
42 34 }
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/config/CustomIdIdentifierGenerator.kt
1   -package cn.fw.hermes.config;
  1 +package cn.fw.hermes.config
2 2  
3   -import com.baomidou.mybatisplus.core.incrementer.IdentifierGenerator;
4   -import com.github.yitter.idgen.YitIdHelper;
5   -import org.springframework.stereotype.Component;
  3 +import com.baomidou.mybatisplus.core.incrementer.IdentifierGenerator
  4 +import com.github.yitter.idgen.YitIdHelper
  5 +import org.springframework.stereotype.Component
6 6  
7 7 /**
8 8 * 自定义id生成器
... ... @@ -10,9 +10,8 @@ import org.springframework.stereotype.Component;
10 10 * @author kurisu
11 11 */
12 12 @Component
13   -public class CustomIdIdentifierGenerator implements IdentifierGenerator {
14   - @Override
15   - public Long nextId(Object entity) {
16   - return YitIdHelper.nextId();
  13 +class CustomIdIdentifierGenerator : IdentifierGenerator {
  14 + override fun nextId(entity: Any): Long {
  15 + return YitIdHelper.nextId()
17 16 }
18 17 }
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/config/LaunchConfiguration.kt
1   -package cn.fw.hermes.config;
  1 +package cn.fw.hermes.config
2 2  
3   -import cn.fw.hermes.service.biz.CommonBizService;
4   -import cn.fw.hermes.service.emqx.MqttService;
5   -import com.github.yitter.contract.IdGeneratorOptions;
6   -import com.github.yitter.idgen.YitIdHelper;
7   -import lombok.extern.slf4j.Slf4j;
8   -import org.springframework.beans.factory.DisposableBean;
9   -import org.springframework.beans.factory.annotation.Autowired;
10   -import org.springframework.boot.CommandLineRunner;
11   -import org.springframework.stereotype.Component;
  3 +import cn.fw.hermes.logError
  4 +import cn.fw.hermes.service.biz.CommonBizService
  5 +import cn.fw.hermes.service.emqx.MqttService
  6 +import com.github.yitter.contract.IdGeneratorOptions
  7 +import com.github.yitter.idgen.YitIdHelper
  8 +import org.springframework.beans.factory.DisposableBean
  9 +import org.springframework.boot.CommandLineRunner
  10 +import org.springframework.stereotype.Component
12 11  
13 12 /**
14 13 * @author: Kurisu
15 14 * @date: 2021/2/4 9:13
16 15 */
17 16 @Component
18   -@Slf4j
19   -public class LaunchConfiguration implements CommandLineRunner, DisposableBean {
20   - private final CommonBizService commonBizService;
21   - private final MqttService mqttService;
22   -
23   - @Autowired
24   - public LaunchConfiguration(final CommonBizService commonBizService,
25   - final MqttService mqttService) {
26   - this.commonBizService = commonBizService;
27   - this.mqttService = mqttService;
28   - }
29   -
30   - @Override
31   - public void run(String... args) {
  17 +class LaunchConfiguration(
  18 + private val commonBizService: CommonBizService,
  19 + private val mqttService: MqttService
  20 +) : CommandLineRunner, DisposableBean {
  21 + override fun run(vararg args: String) {
32 22 try {
33 23 //全局初始化设置WorkerId,默认最大2^16-1,可通过调整 WorkerIdBitLength 增加最大值
34   - IdGeneratorOptions options = new IdGeneratorOptions();
35   - options.WorkerId = 1;
36   - options.WorkerIdBitLength = 3;
37   - YitIdHelper.setIdGenerator(options);
38   - commonBizService.initSysData();
39   - mqttService.initialization();
40   - } catch (Exception e) {
41   - log.error("初始化系统信息失败", e);
  24 + val options = IdGeneratorOptions()
  25 + options.WorkerId = 1
  26 + options.WorkerIdBitLength = 3
  27 + YitIdHelper.setIdGenerator(options)
  28 + commonBizService.initSysData()
  29 + mqttService.initialization()
  30 + } catch (e: Exception) {
  31 + logError("初始化系统信息失败", e)
42 32 }
43 33 }
44 34  
45   - @Override
46   - public void destroy() {
47   - mqttService.disconnect();
  35 + override fun destroy() {
  36 + mqttService.disconnect()
48 37 }
49 38 }
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/config/ScheduleConfig.kt
1   -package cn.fw.hermes.config;
2   -
3   -import org.springframework.context.annotation.Bean;
4   -import org.springframework.context.annotation.Configuration;
5   -import org.springframework.scheduling.annotation.EnableAsync;
6   -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
7   -
8   -import java.util.concurrent.ThreadPoolExecutor;
  1 +package cn.fw.hermes.config
9 2  
  3 +import org.springframework.context.annotation.Bean
  4 +import org.springframework.context.annotation.Configuration
  5 +import org.springframework.scheduling.annotation.EnableAsync
  6 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
  7 +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
10 8  
11 9 /**
12 10 * 定时任务线程池
... ... @@ -15,49 +13,53 @@ import java.util.concurrent.ThreadPoolExecutor;
15 13 */
16 14 @Configuration
17 15 @EnableAsync
18   -public class ScheduleConfig {
  16 +class ScheduleConfig {
  17 + @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
  18 + fun taskExecutor(): ThreadPoolTaskExecutor {
  19 + val executor = ThreadPoolTaskExecutor()
  20 + executor.corePoolSize = corePoolSize
  21 + executor.maxPoolSize = maxPoolSize
  22 + executor.setQueueCapacity(queueCapacity)
  23 + executor.keepAliveSeconds = keepAliveTime
  24 + executor.setThreadNamePrefix(threadNamePrefix)
19 25  
20   - /*
  26 + // 线程池对拒绝任务的处理策略
  27 + // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
  28 + executor.setRejectedExecutionHandler(CallerRunsPolicy())
  29 + // 初始化
  30 + executor.initialize()
  31 + return executor
  32 + }
  33 +
  34 + companion object {
  35 + /*
21 36 * 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
22 37 * 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
23 38 * 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
24 39 */
  40 + /**
  41 + * 核心线程数(默认线程数)
  42 + */
  43 + private const val corePoolSize = 2 shl 4
25 44  
26   - /**
27   - * 核心线程数(默认线程数)
28   - */
29   - private static final int corePoolSize = 2 << 4;
30   - /**
31   - * 最大线程数
32   - */
33   - private static final int maxPoolSize = 2 << 7;
34   - /**
35   - * 允许线程空闲时间(单位:默认为秒)
36   - */
37   - private static final int keepAliveTime = 30;
38   - /**
39   - * 缓冲队列大小
40   - */
41   - private static final int queueCapacity = 2 << 10;
42   - /**
43   - * 线程池名前缀
44   - */
45   - private static final String threadNamePrefix = "Hermes-Task-";
  45 + /**
  46 + * 最大线程数
  47 + */
  48 + private const val maxPoolSize = 2 shl 7
46 49  
47   - @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
48   - public ThreadPoolTaskExecutor taskExecutor() {
49   - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
50   - executor.setCorePoolSize(corePoolSize);
51   - executor.setMaxPoolSize(maxPoolSize);
52   - executor.setQueueCapacity(queueCapacity);
53   - executor.setKeepAliveSeconds(keepAliveTime);
54   - executor.setThreadNamePrefix(threadNamePrefix);
  50 + /**
  51 + * 允许线程空闲时间(单位:默认为秒)
  52 + */
  53 + private const val keepAliveTime = 30
55 54  
56   - // 线程池对拒绝任务的处理策略
57   - // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
58   - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
59   - // 初始化
60   - executor.initialize();
61   - return executor;
  55 + /**
  56 + * 缓冲队列大小
  57 + */
  58 + private const val queueCapacity = 2 shl 10
  59 +
  60 + /**
  61 + * 线程池名前缀
  62 + */
  63 + private const val threadNamePrefix = "Hermes-Task-"
62 64 }
63 65 }
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/task/.gitkeep deleted
fw-hermes-server/src/main/java/cn/fw/hermes/task/MessageManaTask.kt
1   -package cn.fw.hermes.task;
  1 +package cn.fw.hermes.task
2 2  
3   -import cn.fw.hermes.service.biz.MessageManaBizService;
4   -import org.springframework.beans.factory.annotation.Autowired;
5   -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
6   -import org.springframework.scheduling.annotation.Scheduled;
7   -import org.springframework.stereotype.Component;
8   -
9   -import java.time.LocalDate;
  3 +import cn.fw.hermes.service.biz.MessageManaBizService
  4 +import org.springframework.beans.factory.annotation.Autowired
  5 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
  6 +import org.springframework.scheduling.annotation.Scheduled
  7 +import org.springframework.stereotype.Component
  8 +import java.time.LocalDate
10 9  
11 10 /**
12 11 * 历史消息管理定时器
... ... @@ -17,20 +16,13 @@ import java.time.LocalDate;
17 16 * @date : 2023-07-24 15:44
18 17 */
19 18 @Component
20   -@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
21   -public class MessageManaTask {
22   - private final MessageManaBizService messageManaBizService;
23   -
24   - @Autowired
25   - public MessageManaTask(final MessageManaBizService messageManaBizService) {
26   - this.messageManaBizService = messageManaBizService;
27   - }
28   -
  19 +@ConditionalOnProperty(prefix = "task", name = ["switch"], havingValue = "on")
  20 +class MessageManaTask(private val messageManaBizService: MessageManaBizService) {
29 21 /**
30 22 * 定期删除久远的历史消息
31 23 */
32 24 @Scheduled(cron = "0 0 0/2 * * *")
33   - public void deleteOldMsg() {
34   - messageManaBizService.removeHistoryMsg(LocalDate.now().minusMonths(6L));
  25 + fun deleteOldMsg() {
  26 + messageManaBizService.removeHistoryMsg(LocalDate.now().minusMonths(6L))
35 27 }
36 28 }
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/task/MessageSaveTask.kt
1   -package cn.fw.hermes.task;
  1 +package cn.fw.hermes.task
2 2  
3   -import cn.fw.hermes.common.constant.Constant;
4   -import cn.fw.hermes.common.utils.StringUtils;
5   -import cn.fw.hermes.common.utils.ThreadPoolUtil;
6   -import cn.fw.hermes.domain.db.MessageData;
7   -import cn.fw.hermes.domain.dto.MessageConditionDTO;
8   -import cn.fw.hermes.service.biz.MsgPrepareBizService;
9   -import cn.fw.hermes.service.data.MessageDataService;
10   -import cn.hutool.json.JSONArray;
11   -import cn.hutool.json.JSONUtil;
12   -import lombok.extern.slf4j.Slf4j;
13   -import org.springframework.beans.factory.annotation.Autowired;
14   -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
15   -import org.springframework.data.redis.core.StringRedisTemplate;
16   -import org.springframework.scheduling.annotation.Async;
17   -import org.springframework.scheduling.annotation.Scheduled;
18   -import org.springframework.stereotype.Component;
19   -import org.springframework.util.CollectionUtils;
20   -
21   -import java.util.ArrayList;
22   -import java.util.List;
23   -import java.util.Objects;
24   -import java.util.concurrent.CompletableFuture;
25   -import java.util.concurrent.ExecutorService;
  3 +import cn.fw.hermes.common.constant.Constant
  4 +import cn.fw.hermes.common.utils.ThreadPoolUtil
  5 +import cn.fw.hermes.domain.db.MessageData
  6 +import cn.fw.hermes.domain.dto.MessageConditionDTO
  7 +import cn.fw.hermes.logError
  8 +import cn.fw.hermes.service.biz.MsgPrepareBizService
  9 +import cn.fw.hermes.service.data.MessageDataService
  10 +import cn.hutool.json.JSONUtil
  11 +import lombok.extern.slf4j.Slf4j
  12 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
  13 +import org.springframework.data.redis.core.StringRedisTemplate
  14 +import org.springframework.scheduling.annotation.Async
  15 +import org.springframework.scheduling.annotation.Scheduled
  16 +import org.springframework.stereotype.Component
  17 +import org.springframework.util.CollectionUtils
  18 +import java.util.*
  19 +import java.util.concurrent.CompletableFuture
26 20  
27 21 /**
28 22 * 消息储存任务
... ... @@ -32,44 +26,32 @@ import java.util.concurrent.ExecutorService;
32 26 * @desc : 消息储存任务
33 27 * @date : 2023-07-24 15:05
34 28 */
35   -@Slf4j
36 29 @Component
37   -@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
38   -public class MessageSaveTask {
39   - private final StringRedisTemplate stringRedisTemplate;
40   - private final MsgPrepareBizService msgPrepareBizService;
41   - private final MessageDataService messageDataService;
42   -
43   - @Autowired
44   - public MessageSaveTask(final StringRedisTemplate stringRedisTemplate,
45   - final MsgPrepareBizService msgPrepareBizService,
46   - final MessageDataService messageDataService) {
47   - this.stringRedisTemplate = stringRedisTemplate;
48   - this.msgPrepareBizService = msgPrepareBizService;
49   - this.messageDataService = messageDataService;
50   - }
51   -
  30 +@ConditionalOnProperty(prefix = "task", name = ["switch"], havingValue = "on")
  31 +class MessageSaveTask(
  32 + private val stringRedisTemplate: StringRedisTemplate,
  33 + private val msgPrepareBizService: MsgPrepareBizService,
  34 + private val messageDataService: MessageDataService
  35 +) {
52 36 /**
53 37 * 重试保存失败的消息体
54 38 */
55 39 @Scheduled(initialDelay = 1000 * 5, fixedRate = 1000 * 10)
56   - public void retrySaveMsgBody() {
57   - List<String> failList = new ArrayList<>();
58   - String jsonStr;
59   - while ((jsonStr = stringRedisTemplate.opsForSet().pop(Constant.MSG_BODY_SAVE_FAILED_CACHE_KEY)) != null) {
60   - JSONArray jsonArray = JSONUtil.parseArray(jsonStr);
61   - List<MessageData> list = JSONUtil.toList(jsonArray, MessageData.class);
  40 + fun retrySaveMsgBody() {
  41 + val failList: MutableList<String> = mutableListOf()
  42 + var jsonStr: String?
  43 + while ((stringRedisTemplate.opsForSet().pop(Constant.MSG_BODY_SAVE_FAILED_CACHE_KEY).also { jsonStr = it }) != null) {
  44 + val jsonArray = JSONUtil.parseArray(jsonStr)
  45 + val list = JSONUtil.toList(jsonArray, MessageData::class.java)
62 46 try {
63   - messageDataService.saveBatch(list);
64   - } catch (Exception e) {
65   - if (StringUtils.isValid(jsonStr)) {
66   - failList.add(jsonStr);
67   - }
  47 + messageDataService.saveBatch(list)
  48 + } catch (e: Exception) {
  49 + jsonStr?.let { failList.add(it) }
68 50 }
69 51 }
70 52 if (!CollectionUtils.isEmpty(failList)) {
71   - String[] array = failList.toArray(new String[0]);
72   - stringRedisTemplate.opsForSet().add(Constant.MSG_BODY_SAVE_FAILED_CACHE_KEY, array);
  53 + val array = failList.toTypedArray<String>()
  54 + stringRedisTemplate.opsForSet().add(Constant.MSG_BODY_SAVE_FAILED_CACHE_KEY, *array)
73 55 }
74 56 }
75 57  
... ... @@ -78,33 +60,36 @@ public class MessageSaveTask {
78 60 */
79 61 @Async("taskExecutor")
80 62 @Scheduled(initialDelay = 1000 * 5, fixedRate = 1000)
81   - public void saveMsg() {
82   - List<String> failList = new ArrayList<>();
83   - List<CompletableFuture<Void>> futureList = new ArrayList<>();
84   - String jsonStr;
85   - while ((jsonStr = stringRedisTemplate.opsForList().leftPop(Constant.CACHE_PERSISTENT_MSG_CACHE_KEY)) != null) {
86   - final MessageConditionDTO dto = JSONUtil.toBean(jsonStr, MessageConditionDTO.class);
  63 + fun saveMsg() {
  64 + val failList: MutableList<String> = mutableListOf()
  65 + val futureList: MutableList<CompletableFuture<Void>> = mutableListOf()
  66 + var jsonStr: String?
  67 +
  68 + while ((stringRedisTemplate.opsForList().leftPop(Constant.CACHE_PERSISTENT_MSG_CACHE_KEY).also { jsonStr = it }) != null) {
  69 + val dto = JSONUtil.toBean(jsonStr, MessageConditionDTO::class.java)
87 70 if (Objects.isNull(dto)) {
88   - continue;
  71 + continue
89 72 }
90   - ExecutorService executor = ThreadPoolUtil.getInstance().getExecutor();
91   - CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  73 + val executor = ThreadPoolUtil.getInstance().executor
  74 + val future = CompletableFuture.runAsync({
92 75 try {
93   - msgPrepareBizService.saveMsg(dto);
94   - stringRedisTemplate.opsForSet().add(Constant.READY_TO_SEND_MSG_CACHE_KEY, String.valueOf(dto.getId()));
95   - } catch (Exception e) {
96   - String _jsonStr = JSONUtil.toJsonStr(dto);
97   - if (StringUtils.isValid(_jsonStr)) {
98   - failList.add(_jsonStr);
  76 + msgPrepareBizService.saveMsg(dto)
  77 + stringRedisTemplate.opsForSet().add(Constant.READY_TO_SEND_MSG_CACHE_KEY, dto.id.toString())
  78 + } catch (e: Exception) {
  79 + jsonStr?.let {
  80 + failList.add(it)
99 81 }
100   - log.error("持久化消息记录失败: [{}]", e.getMessage());
  82 +
  83 + logError("持久化消息记录失败: [{}]", e.message)
101 84 }
102   - }, executor);
103   - futureList.add(future);
  85 + }, executor)
  86 + futureList.add(future)
104 87 }
105   - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
  88 +
  89 + CompletableFuture.allOf(*futureList.toTypedArray<CompletableFuture<*>>()).join()
106 90 if (!CollectionUtils.isEmpty(failList)) {
107   - stringRedisTemplate.opsForList().rightPushAll(Constant.CACHE_PERSISTENT_MSG_CACHE_KEY, failList);
  91 + stringRedisTemplate.opsForList().rightPushAll(Constant.CACHE_PERSISTENT_MSG_CACHE_KEY, failList)
108 92 }
109 93 }
  94 +
110 95 }
... ...
fw-hermes-server/src/main/java/cn/fw/hermes/task/MessageSendTask.kt
1   -package cn.fw.hermes.task;
  1 +package cn.fw.hermes.task
2 2  
3   -import cn.fw.hermes.common.constant.Constant;
4   -import cn.fw.hermes.common.utils.StringUtils;
5   -import cn.fw.hermes.domain.db.MessageHistory;
6   -import cn.fw.hermes.domain.enums.MsgStatusEnum;
7   -import cn.fw.hermes.service.biz.AppUpdateBizService;
8   -import cn.fw.hermes.service.biz.MessageSender;
9   -import cn.fw.hermes.service.data.MessageHistoryService;
10   -import com.baomidou.mybatisplus.core.toolkit.Wrappers;
11   -import lombok.extern.slf4j.Slf4j;
12   -import org.springframework.beans.factory.annotation.Autowired;
13   -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
14   -import org.springframework.data.redis.core.StringRedisTemplate;
15   -import org.springframework.scheduling.annotation.Async;
16   -import org.springframework.scheduling.annotation.Scheduled;
17   -import org.springframework.stereotype.Component;
18   -import org.springframework.util.CollectionUtils;
19   -
20   -import java.time.LocalDateTime;
21   -import java.util.ArrayList;
22   -import java.util.List;
  3 +import cn.fw.hermes.common.constant.Constant
  4 +import cn.fw.hermes.domain.db.MessageHistory
  5 +import cn.fw.hermes.domain.enums.MsgStatusEnum
  6 +import cn.fw.hermes.logError
  7 +import cn.fw.hermes.service.biz.AppUpdateBizService
  8 +import cn.fw.hermes.service.biz.MessageSender
  9 +import cn.fw.hermes.service.data.MessageHistoryService
  10 +import com.baomidou.mybatisplus.core.toolkit.Wrappers
  11 +import org.springframework.beans.factory.annotation.Autowired
  12 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
  13 +import org.springframework.data.redis.core.StringRedisTemplate
  14 +import org.springframework.scheduling.annotation.Async
  15 +import org.springframework.scheduling.annotation.Scheduled
  16 +import org.springframework.stereotype.Component
  17 +import org.springframework.util.CollectionUtils
  18 +import java.time.LocalDateTime
23 19  
24 20 /**
25 21 * 消息发送定时器
... ... @@ -29,37 +25,30 @@ import java.util.List;
29 25 * @desc : 消息发送定时器
30 26 * @date : 2023-07-24 15:44
31 27 */
32   -@Slf4j
33 28 @Component
34   -@ConditionalOnProperty(prefix = "task", name = "switch", havingValue = "on")
35   -public class MessageSendTask {
36   - private final StringRedisTemplate stringRedisTemplate;
37   - private final MessageHistoryService messageHistoryService;
38   - private final MessageSender messageSender;
39   - private final AppUpdateBizService appUpdateBizService;
40   -
41   - @Autowired
42   - public MessageSendTask(final StringRedisTemplate stringRedisTemplate,
43   - final MessageHistoryService messageHistoryService,
44   - final MessageSender messageSender,
45   - final AppUpdateBizService appUpdateBizService) {
46   - this.stringRedisTemplate = stringRedisTemplate;
47   - this.messageHistoryService = messageHistoryService;
48   - this.messageSender = messageSender;
49   - this.appUpdateBizService = appUpdateBizService;
50   - }
51   -
  29 +@ConditionalOnProperty(prefix = "task", name = ["switch"], havingValue = "on")
  30 +class MessageSendTask @Autowired constructor(
  31 + private val stringRedisTemplate: StringRedisTemplate,
  32 + private val messageHistoryService: MessageHistoryService,
  33 + private val messageSender: MessageSender,
  34 + private val appUpdateBizService: AppUpdateBizService
  35 +) {
52 36 @Scheduled(cron = "0/30 * * * * *")
53   - public void ready2Send() {
54   - List<MessageHistory> list = messageHistoryService.list(Wrappers.<MessageHistory>lambdaQuery()
55   - .eq(MessageHistory::getMsgStatus, MsgStatusEnum.WAITING)
56   - .ge(MessageHistory::getCreateTime, LocalDateTime.now().minusDays(7L))
57   - );
  37 + fun ready2Send() {
  38 + val list = messageHistoryService.list(
  39 + Wrappers.lambdaQuery<MessageHistory>()
  40 + .eq(MessageHistory::getMsgStatus, MsgStatusEnum.WAITING)
  41 + .ge(MessageHistory::getCreateTime, LocalDateTime.now().minusDays(7L))
  42 + )
58 43 if (CollectionUtils.isEmpty(list)) {
59   - return;
  44 + return
60 45 }
61   - String[] array = list.stream().map(r -> String.valueOf(r.getId())).distinct().toArray(String[]::new);
62   - stringRedisTemplate.opsForSet().add(Constant.READY_TO_SEND_MSG_CACHE_KEY, array);
  46 +
  47 + val array = list.map {
  48 + it.id.toString()
  49 + }.distinct().toTypedArray()
  50 +
  51 + stringRedisTemplate.opsForSet().add(Constant.READY_TO_SEND_MSG_CACHE_KEY, *array)
63 52 }
64 53  
65 54 /**
... ... @@ -67,33 +56,32 @@ public class MessageSendTask {
67 56 */
68 57 @Async("taskExecutor")
69 58 @Scheduled(cron = "0/2 * * * * *")
70   - public void sendMsg() {
71   - List<String> failList = new ArrayList<>();
  59 + fun sendMsg() {
  60 + val failList: MutableList<String> = mutableListOf()
72 61 // 限流标志 [最多一次性推送100条]
73   - int count = 100;
74   - String jsonStr;
75   - while ((jsonStr = stringRedisTemplate.opsForSet().pop(Constant.READY_TO_SEND_MSG_CACHE_KEY)) != null) {
76   - count--;
  62 + var count = 100
  63 + var jsonStr: String?
  64 + while ((stringRedisTemplate.opsForSet().pop(Constant.READY_TO_SEND_MSG_CACHE_KEY).also { jsonStr = it }) != null) {
  65 + count--
77 66 try {
78   - final Long messageId = Long.valueOf(jsonStr);
79   - messageSender.sendMsg(messageId);
80   - } catch (Exception e) {
81   - if (StringUtils.isValid(jsonStr)) {
82   - failList.add(jsonStr);
  67 + jsonStr?.toLong()?.let {
  68 + messageSender.sendMsg(it)
83 69 }
84   - log.error("推送消息失败", e);
  70 + } catch (e: Exception) {
  71 + jsonStr?.let { failList.add(it) }
  72 + logError("推送消息失败", e)
85 73 }
86 74 if (count < 0) {
87   - break;
  75 + break
88 76 }
89 77 }
90 78 if (!CollectionUtils.isEmpty(failList)) {
91   - stringRedisTemplate.opsForSet().add(Constant.READY_TO_SEND_MSG_CACHE_KEY, failList.toArray(new String[0]));
  79 + stringRedisTemplate.opsForSet().add(Constant.READY_TO_SEND_MSG_CACHE_KEY, *failList.toTypedArray<String>())
92 80 }
93 81 }
94 82  
95 83 @Scheduled(cron = "0/20 * * * * *")
96   - public void reSendUpdateMsg() {
97   - appUpdateBizService.reSendUpdateMsg();
  84 + fun reSendUpdateMsg() {
  85 + appUpdateBizService.reSendUpdateMsg()
98 86 }
99 87 }
... ...
fw-hermes-service/pom.xml
... ... @@ -61,6 +61,10 @@
61 61 <groupId>com.github.yitter</groupId>
62 62 <artifactId>yitter-idgenerator</artifactId>
63 63 </dependency>
  64 + <dependency>
  65 + <groupId>io.github.tpnsPush</groupId>
  66 + <artifactId>xinge</artifactId>
  67 + </dependency>
64 68 </dependencies>
65 69 <build>
66 70 <plugins>
... ...
... ... @@ -47,6 +47,8 @@
47 47 <yitter.idgenerator>1.0.6</yitter.idgenerator>
48 48 <freemarker.version>2.3.32</freemarker.version>
49 49 <mqttv3.version>1.2.2</mqttv3.version>
  50 + <kotlinx.coroutines.version>1.7.3</kotlinx.coroutines.version>
  51 + <tpnspush.version>1.2.4.17</tpnspush.version>
50 52 </properties>
51 53  
52 54 <dependencyManagement>
... ... @@ -155,10 +157,36 @@
155 157 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
156 158 <version>${mqttv3.version}</version>
157 159 </dependency>
  160 + <dependency>
  161 + <groupId>org.jetbrains.kotlinx</groupId>
  162 + <artifactId>kotlinx-coroutines-core</artifactId>
  163 + <version>${kotlinx.coroutines.version}</version>
  164 + </dependency>
  165 + <dependency>
  166 + <groupId>io.github.tpnsPush</groupId>
  167 + <artifactId>xinge</artifactId>
  168 + <version>${tpnspush.version}</version>
  169 + </dependency>
158 170 </dependencies>
159 171 </dependencyManagement>
160 172  
161 173 <dependencies>
  174 + <!-- kotlin -->
  175 + <dependency>
  176 + <groupId>org.jetbrains.kotlin</groupId>
  177 + <artifactId>kotlin-stdlib</artifactId>
  178 + <version>${kotlin.version}</version>
  179 + </dependency>
  180 + <dependency>
  181 + <groupId>org.jetbrains.kotlin</groupId>
  182 + <artifactId>kotlin-reflect</artifactId>
  183 + <version>${kotlin.version}</version>
  184 + </dependency>
  185 + <dependency>
  186 + <groupId>org.jetbrains.kotlin</groupId>
  187 + <artifactId>kotlin-stdlib-jdk8</artifactId>
  188 + <version>${kotlin.version}</version>
  189 + </dependency>
162 190 <dependency>
163 191 <groupId>org.projectlombok</groupId>
164 192 <artifactId>lombok</artifactId>
... ... @@ -174,6 +202,10 @@
174 202 <build>
175 203 <plugins>
176 204 <plugin>
  205 + <groupId>org.jetbrains.kotlin</groupId>
  206 + <artifactId>kotlin-maven-plugin</artifactId>
  207 + </plugin>
  208 + <plugin>
177 209 <groupId>org.apache.maven.plugins</groupId>
178 210 <artifactId>maven-compiler-plugin</artifactId>
179 211 <configuration>
... ...