diff --git a/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowRecordTask.java b/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowRecordTask.java index fad0270..bd57629 100644 --- a/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowRecordTask.java +++ b/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowRecordTask.java @@ -1,5 +1,6 @@ package cn.fw.shirasawa.server.controller.task; +import cn.fw.common.cache.locker.DistributedLocker; import cn.fw.shirasawa.common.utils.DateUtil; import cn.fw.shirasawa.domain.db.follow.FollowRecord; import cn.fw.shirasawa.domain.db.follow.FollowTask; @@ -15,9 +16,13 @@ import cn.fw.shirasawa.service.data.FollowRecordService; import cn.fw.shirasawa.service.data.FollowTaskService; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.lang.NonNull; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -25,6 +30,7 @@ import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.*; +import java.util.concurrent.locks.Lock; /** * @author : kurisu @@ -41,18 +47,25 @@ public class FollowRecordTask { private final TodoRpcService todoRpcService; private final FollowTaskService followTaskService; private final EhrRpcService ehrRpcService; + private final DistributedLocker distributedLocker; + + @Value("${spring.cache.custom.global-prefix}:task") + @Getter + private String prefix; @Autowired public FollowRecordTask(final FollowRecordService followRecordService, final FollowBizService followBizService, final TodoRpcService todoRpcService, final FollowTaskService followTaskService, - final EhrRpcService ehrRpcService) { + final EhrRpcService ehrRpcService, + final DistributedLocker distributedLocker) { this.followRecordService = followRecordService; this.followBizService = followBizService; this.todoRpcService = todoRpcService; this.followTaskService = followTaskService; this.ehrRpcService = ehrRpcService; + this.distributedLocker = distributedLocker; } /** @@ -60,17 +73,27 @@ public class FollowRecordTask { */ @Scheduled(initialDelay = 1000 * 15, fixedRate = 1000 * 5) public void endTaskRecord() { - List list = followRecordService.list(Wrappers.lambdaQuery() - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) - .le(FollowRecord::getDeadline, LocalDateTime.now()) - .eq(FollowRecord::getYn, Boolean.TRUE) - .last("limit 0, 500") - ); - if (CollectionUtils.isEmpty(list)) { + final String cacheName = getCacheName("end-task"); + Lock lock = distributedLocker.lock(cacheName); + boolean locked = ((RLock) lock).isLocked(); + if (!locked) { return; } - for (FollowRecord record : list) { - followBizService.overdueProcessing(record); + try { + List list = followRecordService.list(Wrappers.lambdaQuery() + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) + .le(FollowRecord::getDeadline, LocalDateTime.now()) + .eq(FollowRecord::getYn, Boolean.TRUE) + .last("limit 0, 500") + ); + if (CollectionUtils.isEmpty(list)) { + return; + } + for (FollowRecord record : list) { + followBizService.overdueProcessing(record); + } + } finally { + distributedLocker.unlock(lock); } } @@ -80,15 +103,25 @@ public class FollowRecordTask { */ @Scheduled(cron = "0/8 * 8-18 * * *") public void push2NorTodo() { - List list = followRecordService.list(Wrappers.lambdaQuery() - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) - .eq(FollowRecord::getAddTodo, Boolean.FALSE) - .notIn(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF) - .le(FollowRecord::getPlanTime, LocalDateTime.now()) - .eq(FollowRecord::getYn, Boolean.TRUE) - .last("limit 0,500") - ); - execute(list); + final String cacheName = getCacheName("push-task:normal"); + Lock lock = distributedLocker.lock(cacheName); + boolean locked = ((RLock) lock).isLocked(); + if (!locked) { + return; + } + try { + List list = followRecordService.list(Wrappers.lambdaQuery() + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) + .eq(FollowRecord::getAddTodo, Boolean.FALSE) + .notIn(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF) + .le(FollowRecord::getPlanTime, LocalDateTime.now()) + .eq(FollowRecord::getYn, Boolean.TRUE) + .last("limit 0,500") + ); + execute(list); + } finally { + distributedLocker.unlock(lock); + } } /** @@ -97,15 +130,25 @@ public class FollowRecordTask { */ @Scheduled(initialDelay = 1500, fixedRate = 1000 * 3) public void push2AccTodo() { - List list = followRecordService.list(Wrappers.lambdaQuery() - .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) - .eq(FollowRecord::getAddTodo, Boolean.FALSE) - .eq(FollowRecord::getYn, Boolean.TRUE) - .in(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF) - .le(FollowRecord::getPlanTime, LocalDateTime.now()) - .last("limit 0,50") - ); - execute(list); + final String cacheName = getCacheName("push-task:acc"); + Lock lock = distributedLocker.lock(cacheName); + boolean locked = ((RLock) lock).isLocked(); + if (!locked) { + return; + } + try { + List list = followRecordService.list(Wrappers.lambdaQuery() + .eq(FollowRecord::getOutTime, OutTimeEnum.ONGOING) + .eq(FollowRecord::getAddTodo, Boolean.FALSE) + .eq(FollowRecord::getYn, Boolean.TRUE) + .in(FollowRecord::getType, FollowTypeEnum.AC, FollowTypeEnum.CF, FollowTypeEnum.PF) + .le(FollowRecord::getPlanTime, LocalDateTime.now()) + .last("limit 0,50") + ); + execute(list); + } finally { + distributedLocker.unlock(lock); + } } @@ -114,9 +157,19 @@ public class FollowRecordTask { */ @Scheduled(initialDelay = 1500, fixedRate = 15 * 1000) public void retryCompleteTodoItem() { - String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.COMPLETE); - Collection all = todoRpcService.getAllFromCache(key); - all.forEach(todoRpcService::complete); + final String cacheName = getCacheName("retry:complete"); + Lock lock = distributedLocker.lock(cacheName); + boolean locked = ((RLock) lock).isLocked(); + if (!locked) { + return; + } + try { + String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.COMPLETE); + Collection all = todoRpcService.getAllFromCache(key); + all.forEach(todoRpcService::complete); + } finally { + distributedLocker.unlock(lock); + } } /** @@ -124,9 +177,19 @@ public class FollowRecordTask { */ @Scheduled(initialDelay = 1000, fixedRate = 10 * 1000) public void retryCancelTodoItem() { - String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.CANCEL); - Collection all = todoRpcService.getAllFromCache(key); - all.forEach(todoRpcService::cancel); + final String cacheName = getCacheName("retry:cancel"); + Lock lock = distributedLocker.lock(cacheName); + boolean locked = ((RLock) lock).isLocked(); + if (!locked) { + return; + } + try { + String key = todoRpcService.generateKey(TodoRpcService.TodoOperationEnum.CANCEL); + Collection all = todoRpcService.getAllFromCache(key); + all.forEach(todoRpcService::cancel); + } finally { + distributedLocker.unlock(lock); + } } @@ -174,4 +237,8 @@ public class FollowRecordTask { return false; } } + + private String getCacheName(@NonNull String name) { + return getPrefix() + ":" + name; + } } diff --git a/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowTaskDealTask.java b/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowTaskDealTask.java index 483fff2..3a23d9a 100644 --- a/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowTaskDealTask.java +++ b/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/FollowTaskDealTask.java @@ -1,5 +1,6 @@ package cn.fw.shirasawa.server.controller.task; +import cn.fw.common.cache.locker.DistributedLocker; import cn.fw.shirasawa.domain.db.follow.FollowTask; import cn.fw.shirasawa.domain.db.pool.CluePool; import cn.fw.shirasawa.domain.enums.ClueStatusEnum; @@ -11,13 +12,14 @@ import cn.hutool.core.thread.ThreadFactoryBuilder; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.lang.NonNull; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import java.time.LocalDate; @@ -29,6 +31,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; /** * @author : kurisu @@ -47,19 +50,25 @@ public class FollowTaskDealTask { TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNamePrefix("close-clue-pool-").build(), new ThreadPoolExecutor.AbortPolicy()); + private final DistributedLocker distributedLocker; private final StringRedisTemplate redisTemplate; @Value("${spring.cache.locker.key-prefix}:termination") @Getter private String planKey; + @Value("${spring.cache.custom.global-prefix}:clue") + @Getter + private String prefix; @Autowired public FollowTaskDealTask(final FollowTaskService followTaskService, final FollowBizService followBizService, final CluePoolService cluePoolService, + final DistributedLocker distributedLocker, final StringRedisTemplate redisTemplate) { this.followTaskService = followTaskService; this.followBizService = followBizService; this.cluePoolService = cluePoolService; + this.distributedLocker = distributedLocker; this.redisTemplate = redisTemplate; } @@ -68,17 +77,27 @@ public class FollowTaskDealTask { */ @Scheduled(initialDelay = 1000 * 10, fixedRate = 1000 * 10) public void startClue() { - List list = cluePoolService.list(Wrappers.lambdaQuery() - .eq(CluePool::getClueStatus, ClueStatusEnum.WAITING) - .ge(CluePool::getStartTime, LocalDate.now().atStartOfDay()) - .le(CluePool::getStartTime, LocalDateTime.now()) - .last("limit 0, 500") - ); - if (CollectionUtils.isEmpty(list)) { + String key = getCacheName("start"); + Lock lock = distributedLocker.lock(key); + boolean locked = ((RLock) lock).isLocked(); + if (!locked) { return; } - for (CluePool cluePool : list) { - followBizService.startClue(cluePool); + try { + List list = cluePoolService.list(Wrappers.lambdaQuery() + .eq(CluePool::getClueStatus, ClueStatusEnum.WAITING) + .ge(CluePool::getStartTime, LocalDate.now().atStartOfDay()) + .le(CluePool::getStartTime, LocalDateTime.now()) + .last("limit 0, 500") + ); + if (CollectionUtils.isEmpty(list)) { + return; + } + for (CluePool cluePool : list) { + followBizService.startClue(cluePool); + } + } finally { + distributedLocker.unlock(lock); } } @@ -87,15 +106,25 @@ public class FollowTaskDealTask { */ @Scheduled(initialDelay = 1000 * 30, fixedRate = 1000 * 60) public void endTask() { - List list = followTaskService.list(Wrappers.lambdaQuery() - .eq(FollowTask::getState, TaskStateEnum.ONGOING) - .le(FollowTask::getDeadline, LocalDateTime.now()) - ); - if (CollectionUtils.isEmpty(list)) { + String key = getCacheName("end"); + Lock lock = distributedLocker.lock(key); + boolean locked = ((RLock) lock).isLocked(); + if (!locked) { return; } - for (FollowTask r : list) { - followBizService.endTask(r); + try { + List list = followTaskService.list(Wrappers.lambdaQuery() + .eq(FollowTask::getState, TaskStateEnum.ONGOING) + .le(FollowTask::getDeadline, LocalDateTime.now()) + ); + if (CollectionUtils.isEmpty(list)) { + return; + } + for (FollowTask r : list) { + followBizService.endTask(r); + } + } finally { + distributedLocker.unlock(lock); } } @@ -133,4 +162,8 @@ public class FollowTaskDealTask { THREAD_POOL.execute(() -> followBizService.endTask(cluePool)); } } + + private String getCacheName(@NonNull String name) { + return getPrefix() + ":" + name; + } } diff --git a/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/OriginDataDealTask.java b/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/OriginDataDealTask.java index 93263c4..63f0c8d 100644 --- a/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/OriginDataDealTask.java +++ b/fw-shirasawa-server/src/main/java/cn/fw/shirasawa/server/controller/task/OriginDataDealTask.java @@ -1,16 +1,21 @@ package cn.fw.shirasawa.server.controller.task; +import cn.fw.common.cache.locker.DistributedLocker; import cn.fw.shirasawa.domain.db.OriginalData; import cn.fw.shirasawa.service.bus.follow.FollowBizService; import cn.fw.shirasawa.service.data.OriginalDataService; import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.Getter; +import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.List; +import java.util.concurrent.locks.Lock; /** * @author : kurisu @@ -23,26 +28,43 @@ import java.util.List; public class OriginDataDealTask { private final OriginalDataService originalDataService; private final FollowBizService followBizService; + private final DistributedLocker distributedLocker; + + @Value("${spring.cache.custom.global-prefix}:origin") + @Getter + private String prefix; @Autowired public OriginDataDealTask(final OriginalDataService originalDataService, - final FollowBizService followBizService) { + final FollowBizService followBizService, + final DistributedLocker distributedLocker) { this.originalDataService = originalDataService; this.followBizService = followBizService; + this.distributedLocker = distributedLocker; } @Scheduled(initialDelay = 1000 * 15, fixedRate = 1000 * 15) public void syn2Task() { - List list = originalDataService.list(Wrappers.lambdaQuery() - .eq(OriginalData::getSolved, Boolean.FALSE) - .last("limit 0,500") - ); - if (CollectionUtils.isEmpty(list)) { + String key = getPrefix() + ":syncdata"; + Lock lock = distributedLocker.lock(key); + boolean locked = ((RLock) lock).isLocked(); + if (!locked) { return; } - for (OriginalData data : list) { - data.setSolved(followBizService.origin2task(data)); + try { + List list = originalDataService.list(Wrappers.lambdaQuery() + .eq(OriginalData::getSolved, Boolean.FALSE) + .last("limit 0,500") + ); + if (CollectionUtils.isEmpty(list)) { + return; + } + for (OriginalData data : list) { + data.setSolved(followBizService.origin2task(data)); + } + originalDataService.updateBatchById(list); + } finally { + distributedLocker.unlock(lock); } - originalDataService.updateBatchById(list); } } diff --git a/fw-shirasawa-server/src/main/resources/application-local.yml b/fw-shirasawa-server/src/main/resources/application-local.yml index a36a43a..8a41e0d 100644 --- a/fw-shirasawa-server/src/main/resources/application-local.yml +++ b/fw-shirasawa-server/src/main/resources/application-local.yml @@ -10,6 +10,10 @@ nacos: plugin: namespace: df959b8c-de58-4b02-b9fb-d65ca3be05f3 spring: + cloud: + inetutils: + preferred-networks: + - 10.8 application: name: fw-shirasawa-local datasource: