Commit 2779de8ffc74de82b7ee9e6758ea19d16adc7b64
Merge remote-tracking branch 'origin/test'
Showing
12 changed files
with
175 additions
and
27 deletions
fw-hermes-common/pom.xml
... | ... | @@ -25,6 +25,10 @@ |
25 | 25 | <groupId>cn.fw</groupId> |
26 | 26 | <artifactId>fw-data-base</artifactId> |
27 | 27 | </dependency> |
28 | + <dependency> | |
29 | + <groupId>commons-codec</groupId> | |
30 | + <artifactId>commons-codec</artifactId> | |
31 | + </dependency> | |
28 | 32 | </dependencies> |
29 | 33 | |
30 | 34 | <build> |
... | ... | @@ -38,4 +42,4 @@ |
38 | 42 | </plugin> |
39 | 43 | </plugins> |
40 | 44 | </build> |
41 | -</project> | |
42 | 45 | \ No newline at end of file |
46 | +</project> | ... | ... |
fw-hermes-common/src/main/java/cn/fw/hermes/common/utils/RobotHelper.java
0 → 100644
1 | +package cn.fw.hermes.common.utils; | |
2 | + | |
3 | +import cn.hutool.http.HttpRequest; | |
4 | +import cn.hutool.http.HttpUtil; | |
5 | +import cn.hutool.json.JSONUtil; | |
6 | +import org.apache.commons.codec.binary.Base64; | |
7 | + | |
8 | +import javax.crypto.Mac; | |
9 | +import javax.crypto.spec.SecretKeySpec; | |
10 | +import java.net.URLEncoder; | |
11 | +import java.nio.charset.StandardCharsets; | |
12 | +import java.util.Date; | |
13 | +import java.util.HashMap; | |
14 | +import java.util.Map; | |
15 | + | |
16 | +/** | |
17 | + * 机器人工具 | |
18 | + * | |
19 | + * @author : kurisu | |
20 | + * @version : 2.0 | |
21 | + * @desc : 机器人工具 | |
22 | + * @date : 2023-09-08 14:40 | |
23 | + */ | |
24 | +public class RobotHelper { | |
25 | + private static final String DING_TALK_ROBOT_API = "https://oapi.dingtalk.com/robot/send"; | |
26 | + | |
27 | + public static String sendEarlyWarning(String accessToken, String secret) throws Exception { | |
28 | + Long timestamp = System.currentTimeMillis(); | |
29 | + String sign = createSign(timestamp, secret); | |
30 | + String url = String.format("%s?access_token=%s×tamp=%s&sign=%s", DING_TALK_ROBOT_API, accessToken, timestamp, sign); | |
31 | + | |
32 | + Map<String, String> markdown = new HashMap<>(); | |
33 | + markdown.put("title", "预警信息"); | |
34 | + markdown.put("text", "### MQTT服务断开连接 \n" + | |
35 | + " ##### 重连次数超过10次\n" + | |
36 | + " ###### " + DateUtil.getFormatString(new Date(), "MM月dd日 HH点mm分") + | |
37 | + " 发布 \n"); | |
38 | + | |
39 | + Map<String, Object> params = new HashMap<>(); | |
40 | + params.put("msgtype", "markdown"); | |
41 | + params.put("markdown", markdown); | |
42 | + HttpRequest post = HttpUtil.createPost(url); | |
43 | + post.body(JSONUtil.toJsonStr(params), "application/json"); | |
44 | + | |
45 | + return post.setConnectionTimeout(-1).execute().body(); | |
46 | + } | |
47 | + | |
48 | + public static void main(String[] args) throws Exception { | |
49 | + String res = sendEarlyWarning("31a16df234bc4b78a29c5a4cc35a7421fc90a87cc339dbb545117b6966d2ca32", "SEC509e8cd6d0c3969563acf9e33c177c49cf2a365cd60ddc841d4eb728afcd6940"); | |
50 | + System.out.println(res); | |
51 | + } | |
52 | + | |
53 | + private static String createSign(Long timestamp, String secret) throws Exception { | |
54 | + String stringToSign = timestamp + "\n" + secret; | |
55 | + Mac mac = Mac.getInstance("HmacSHA256"); | |
56 | + mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256")); | |
57 | + byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8)); | |
58 | + return URLEncoder.encode(new String(Base64.encodeBase64(signData)), "UTF-8"); | |
59 | + } | |
60 | +} | ... | ... |
fw-hermes-common/src/main/java/cn/fw/hermes/common/utils/ThreadPoolUtil.java
... | ... | @@ -4,7 +4,6 @@ import cn.hutool.core.thread.ExecutorBuilder; |
4 | 4 | import cn.hutool.core.thread.NamedThreadFactory; |
5 | 5 | |
6 | 6 | import java.util.concurrent.ExecutorService; |
7 | -import java.util.concurrent.LinkedBlockingQueue; | |
8 | 7 | |
9 | 8 | /** |
10 | 9 | * ThreadPoolUtil |
... | ... | @@ -21,12 +20,12 @@ public class ThreadPoolUtil { |
21 | 20 | |
22 | 21 | private ThreadPoolUtil() { |
23 | 22 | this.executor = ExecutorBuilder.create() |
24 | - .setCorePoolSize(20) | |
25 | - .setMaxPoolSize(200) | |
26 | - .setThreadFactory(new NamedThreadFactory("Hermes-Custom-", false)) | |
27 | - .setAllowCoreThreadTimeOut(true) | |
28 | - .setWorkQueue(new LinkedBlockingQueue<>(4096)) | |
29 | - .build(); | |
23 | + .setCorePoolSize(2 << 5) | |
24 | + .setMaxPoolSize(2 << 6) | |
25 | + .setThreadFactory(new NamedThreadFactory("Hermes-Custom-", false)) | |
26 | + .setAllowCoreThreadTimeOut(true) | |
27 | + .useArrayBlockingQueue(2 << 10) | |
28 | + .build(); | |
30 | 29 | } |
31 | 30 | |
32 | 31 | public static ThreadPoolUtil getInstance() { | ... | ... |
fw-hermes-server/src/main/java/cn/fw/hermes/config/ScheduleConfig.java
... | ... | @@ -26,11 +26,11 @@ public class ScheduleConfig { |
26 | 26 | /** |
27 | 27 | * 核心线程数(默认线程数) |
28 | 28 | */ |
29 | - private static final int corePoolSize = 25; | |
29 | + private static final int corePoolSize = 2 << 4; | |
30 | 30 | /** |
31 | 31 | * 最大线程数 |
32 | 32 | */ |
33 | - private static final int maxPoolSize = 50; | |
33 | + private static final int maxPoolSize = 2 << 7; | |
34 | 34 | /** |
35 | 35 | * 允许线程空闲时间(单位:默认为秒) |
36 | 36 | */ |
... | ... | @@ -38,7 +38,7 @@ public class ScheduleConfig { |
38 | 38 | /** |
39 | 39 | * 缓冲队列大小 |
40 | 40 | */ |
41 | - private static final int queueCapacity = 20480; | |
41 | + private static final int queueCapacity = 2 << 10; | |
42 | 42 | /** |
43 | 43 | * 线程池名前缀 |
44 | 44 | */ | ... | ... |
fw-hermes-server/src/main/java/cn/fw/hermes/controller/erp/SysDebugController.java
... | ... | @@ -74,4 +74,17 @@ public class SysDebugController { |
74 | 74 | commonBizService.saveMsgBizType(type, typeDesc); |
75 | 75 | return success(); |
76 | 76 | } |
77 | + | |
78 | + @ControllerMethod("重连mqtt服务器") | |
79 | + @PostMapping("/client/reconnect") | |
80 | + public Message<Boolean> reconnect() { | |
81 | + return success(commonBizService.reconnect()); | |
82 | + } | |
83 | + | |
84 | + @ControllerMethod("模拟发送预警消息") | |
85 | + @PostMapping("/send/warning/msg") | |
86 | + public Message<Void> sendWarningMsg() { | |
87 | + commonBizService.debugSendWarningMsg(); | |
88 | + return success(); | |
89 | + } | |
77 | 90 | } | ... | ... |
fw-hermes-server/src/main/java/cn/fw/hermes/task/MessageSendTask.java
fw-hermes-server/src/main/resources/application-prd.yml
fw-hermes-server/src/main/resources/application.yml
fw-hermes-service/pom.xml
... | ... | @@ -27,7 +27,6 @@ |
27 | 27 | <groupId>cn.fw</groupId> |
28 | 28 | <artifactId>fw-hermes-rpc</artifactId> |
29 | 29 | </dependency> |
30 | - | |
31 | 30 | <dependency> |
32 | 31 | <groupId>org.apache.rocketmq</groupId> |
33 | 32 | <artifactId>rocketmq-spring-boot-starter</artifactId> |
... | ... | @@ -74,4 +73,4 @@ |
74 | 73 | </plugin> |
75 | 74 | </plugins> |
76 | 75 | </build> |
77 | -</project> | |
78 | 76 | \ No newline at end of file |
77 | +</project> | ... | ... |
fw-hermes-service/src/main/java/cn/fw/hermes/service/biz/CommonBizService.java
... | ... | @@ -14,6 +14,7 @@ import cn.fw.hermes.domain.vo.TopicVO; |
14 | 14 | import cn.fw.hermes.service.data.MessageBusinessTypeService; |
15 | 15 | import cn.fw.hermes.service.data.SysPlatformService; |
16 | 16 | import cn.fw.hermes.service.data.SysTopicService; |
17 | +import cn.fw.hermes.service.emqx.MqttService; | |
17 | 18 | import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
18 | 19 | import lombok.Getter; |
19 | 20 | import lombok.RequiredArgsConstructor; |
... | ... | @@ -44,6 +45,7 @@ public class CommonBizService { |
44 | 45 | private final SysTopicService sysTopicService; |
45 | 46 | private final MessageBusinessTypeService messageBusinessTypeService; |
46 | 47 | private final CacheExtender cache; |
48 | + private final MqttService mqttService; | |
47 | 49 | |
48 | 50 | @Value("${spring.cache.locker.key-prefix}:common") |
49 | 51 | @Getter |
... | ... | @@ -121,7 +123,7 @@ public class CommonBizService { |
121 | 123 | @DisLock(prefix = "#this.getKeyPrefix()", key = "'msg_biz_type'", message = "请勿重复操作") |
122 | 124 | public void saveMsgBizType(String type, String desc) { |
123 | 125 | boolean exist = messageBusinessTypeService.count(Wrappers.<MessageBusinessType>lambdaQuery() |
124 | - .eq(MessageBusinessType::getType, type) | |
126 | + .eq(MessageBusinessType::getType, type) | |
125 | 127 | ) > 0; |
126 | 128 | BV.isFalse(exist, () -> "已存在该类型"); |
127 | 129 | MessageBusinessType bizType = new MessageBusinessType(); |
... | ... | @@ -129,4 +131,24 @@ public class CommonBizService { |
129 | 131 | bizType.setTypeDesc(desc); |
130 | 132 | messageBusinessTypeService.save(bizType); |
131 | 133 | } |
134 | + | |
135 | + /** | |
136 | + * 重连mqtt服务器 | |
137 | + * | |
138 | + * @return | |
139 | + */ | |
140 | + @DisLock(prefix = "#this.getKeyPrefix()", key = "'msg_client_reconnect'", message = "请勿重复操作") | |
141 | + public boolean reconnect() { | |
142 | + return mqttService.manualReConnect(); | |
143 | + } | |
144 | + | |
145 | + /** | |
146 | + * 模拟发送测试预警钉钉机器人消息 | |
147 | + * | |
148 | + * @return | |
149 | + */ | |
150 | + @DisLock(prefix = "#this.getKeyPrefix()", key = "'debug_send_warning_msg'", message = "请勿重复操作") | |
151 | + public void debugSendWarningMsg() { | |
152 | + mqttService.sendWarningMsg(); | |
153 | + } | |
132 | 154 | } | ... | ... |
fw-hermes-service/src/main/java/cn/fw/hermes/service/emqx/MqttService.java
1 | 1 | package cn.fw.hermes.service.emqx; |
2 | 2 | |
3 | 3 | import cn.fw.hermes.common.constant.Constant; |
4 | +import cn.fw.hermes.common.utils.RobotHelper; | |
4 | 5 | import cn.fw.hermes.common.utils.ThreadPoolUtil; |
5 | 6 | import cn.fw.hermes.domain.mqtt.Message; |
6 | 7 | import cn.fw.hermes.domain.mqtt.UpdateMessage; |
... | ... | @@ -16,6 +17,7 @@ import org.springframework.stereotype.Service; |
16 | 17 | |
17 | 18 | import java.nio.charset.StandardCharsets; |
18 | 19 | import java.util.List; |
20 | +import java.util.Objects; | |
19 | 21 | import java.util.concurrent.CompletableFuture; |
20 | 22 | import java.util.concurrent.atomic.AtomicBoolean; |
21 | 23 | |
... | ... | @@ -54,6 +56,24 @@ public class MqttService implements MqttCallbackExtended { |
54 | 56 | } |
55 | 57 | } |
56 | 58 | |
59 | + /** | |
60 | + * 手动重连 | |
61 | + * | |
62 | + * @return | |
63 | + */ | |
64 | + public boolean manualReConnect() { | |
65 | + if (mqttAsyncClient.isConnected()) { | |
66 | + return true; | |
67 | + } | |
68 | + try { | |
69 | + mqttAsyncClient.reconnect(); | |
70 | + return true; | |
71 | + } catch (MqttException e) { | |
72 | + log.info("[{}]手动重连失败", TAG, e); | |
73 | + } | |
74 | + return false; | |
75 | + } | |
76 | + | |
57 | 77 | public void disconnect() { |
58 | 78 | try { |
59 | 79 | if (mqttAsyncClient.isConnected()) { |
... | ... | @@ -161,7 +181,7 @@ public class MqttService implements MqttCallbackExtended { |
161 | 181 | if (!mqttAsyncClient.isConnected()) { |
162 | 182 | this.reconnecting.set(false); |
163 | 183 | log.info("[{}]重连次数耗尽", TAG); |
164 | - // TODO: 2023/7/21 钉钉消息通知 | |
184 | + sendWarningMsg(); | |
165 | 185 | } |
166 | 186 | } |
167 | 187 | |
... | ... | @@ -213,4 +233,24 @@ public class MqttService implements MqttCallbackExtended { |
213 | 233 | }, ThreadPoolUtil.getInstance().getExecutor()); |
214 | 234 | } |
215 | 235 | } |
236 | + | |
237 | + /** | |
238 | + * 手动发送预警消息 | |
239 | + * | |
240 | + * @return | |
241 | + */ | |
242 | + public void sendWarningMsg() { | |
243 | + MqttProperty.RobotConfig robot = mqttProperty.getRobot(); | |
244 | + log.info("机器人配置信息:[{}]", robot); | |
245 | + if (Objects.nonNull(robot) && robot.isEnabled()) { | |
246 | + try { | |
247 | + String res = RobotHelper.sendEarlyWarning(robot.getAccessToken(), robot.getSecret()); | |
248 | + log.info("发送预警消息:[{}]", res); | |
249 | + } catch (Exception e) { | |
250 | + log.info("发送预警失败", e); | |
251 | + } | |
252 | + } else { | |
253 | + log.info("机器人预警通知未启用:[{}]", robot); | |
254 | + } | |
255 | + } | |
216 | 256 | } | ... | ... |
fw-hermes-service/src/main/java/cn/fw/hermes/service/property/MqttProperty.java
1 | 1 | package cn.fw.hermes.service.property; |
2 | 2 | |
3 | +import lombok.Data; | |
3 | 4 | import lombok.Getter; |
5 | +import lombok.ToString; | |
4 | 6 | import org.springframework.boot.context.properties.ConfigurationProperties; |
5 | 7 | |
6 | 8 | /** |
... | ... | @@ -12,6 +14,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; |
12 | 14 | * @date : 2023-07-20 18:03 |
13 | 15 | */ |
14 | 16 | @Getter |
17 | +@ToString | |
15 | 18 | @ConfigurationProperties(prefix = "mqtt") |
16 | 19 | public class MqttProperty { |
17 | 20 | private String host = "localhost"; |
... | ... | @@ -21,6 +24,7 @@ public class MqttProperty { |
21 | 24 | private int timeout = 10; |
22 | 25 | private int keepalive = 20; |
23 | 26 | private String clientId; |
27 | + private RobotConfig robot; | |
24 | 28 | |
25 | 29 | public void setHost(String host) { |
26 | 30 | this.host = host; |
... | ... | @@ -50,16 +54,15 @@ public class MqttProperty { |
50 | 54 | this.clientId = clientId; |
51 | 55 | } |
52 | 56 | |
53 | - @Override | |
54 | - public String toString() { | |
55 | - return "MqttProperty{" + | |
56 | - "host='" + host + '\'' + | |
57 | - ", userName='" + userName + '\'' + | |
58 | - ", password='" + password + '\'' + | |
59 | - ", port=" + port + | |
60 | - ", timeout=" + timeout + | |
61 | - ", keepalive=" + keepalive + | |
62 | - ", clientId='" + clientId + '\'' + | |
63 | - '}'; | |
57 | + public void setRobot(RobotConfig robot) { | |
58 | + this.robot = robot; | |
59 | + } | |
60 | + | |
61 | + | |
62 | + @Data | |
63 | + public static class RobotConfig { | |
64 | + private String accessToken; | |
65 | + private String secret; | |
66 | + private boolean enabled; | |
64 | 67 | } |
65 | 68 | } | ... | ... |