SalaryEventConsumer.java 3.66 KB
package cn.fw.dalaran.server.rocketMQ;

import cn.fw.dalaran.common.constants.Constants;
import cn.fw.dalaran.common.utils.PublicUtil;
import cn.fw.dalaran.rpc.morax.SalaryRpc;
import cn.fw.dalaran.service.data.ActivityThemeService;
import cn.fw.morax.sdk.dto.CustomList;
import cn.fw.morax.sdk.dto.kpi.KpiGroupUserQuery;
import cn.fw.morax.sdk.dto.kpi.KpiGroupUserResult;
import cn.fw.morax.sdk.dto.kpi.KpiReportNoticeMQ;
import cn.fw.morax.sdk.dto.kpi.UserIndicatorReq;
import cn.fw.starter.redis.redis.RedisUtil;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Objects;

/**
 * @author wmy3969
 * @version 1.0
 * @date 2022/10/09 10:51
 * @Description 薪酬绩效MQ
 */
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = KpiReportNoticeMQ.TOPIC,
        consumerGroup = Constants.APPLICATION_NAME + "-" + KpiReportNoticeMQ.TOPIC)
public class SalaryEventConsumer implements RocketMQListener<KpiReportNoticeMQ> {

    private final RedisUtil redisUtil;// redis操作工具
    private final SalaryRpc salaryRpc;// 薪酬绩效rpc
    private final ActivityThemeService activityThemeService;// 活动主题业务

    /**
     * 处理'上报薪酬绩效'消息
     *
     * @param kpiReport
     */
    @Override
    public void onMessage(KpiReportNoticeMQ kpiReport) {
        String indicatorCode = kpiReport.getIndicatorCode();
        if (Objects.equals(Constants.Performance.SCORE_RATE, indicatorCode))
            return;
        log.info("收到上报薪酬绩效消息 message:{}", JSON.toJSONString(kpiReport));
        String redisKey = "SalaryEventConsumer:onMessage";
        if (redisUtil.getLock(redisKey)) {
            try {
                KpiGroupUserQuery param = new KpiGroupUserQuery();
                param.setGroupId(kpiReport.getGroupId());
                param.setDataDate(kpiReport.getUploadDate().getTime());
                param.setPostId(kpiReport.getPostId());
                param.setIndicatorCode(indicatorCode);
                List<KpiGroupUserResult> users = salaryRpc.queryGroupUsers(param);
                CustomList<UserIndicatorReq> indicates = new CustomList<>();
                users.forEach(item -> {// 遍历每个组
                    /*BigDecimal targetValue = item.getTargetValue();// 目标值
                    TargetTypeEnum targetType = item.getTargetType();// 获取目标类型*/
                    List<KpiGroupUserResult.SimpleUserData> userIdList = item.getUserIdList();
                    userIdList.forEach(item1 -> {// 每个组下的不同人
                        UserIndicatorReq userIndicator = PublicUtil.copy(item1, UserIndicatorReq.class);
                        userIndicator.setIndicatorCode(indicatorCode);
                        userIndicator.setDataDate(kpiReport.getUploadDate());
                        userIndicator.setUid(kpiReport.getUid());
                        //userIndicator.setValue();
                        indicates.add(userIndicator);
                    });
                });
                salaryRpc.reportKpi(indicates);
            } catch (Exception e) {
                log.error("处理上报薪酬绩效消息发生异常 message:{}", JSON.toJSONString(kpiReport), e);
            } finally {
                redisUtil.unlock(redisKey);
            }
        } else {
            log.info("执行cn.fw.dalaran.server.rocketMQ.SalaryEventConsumer.onMessage()方法获取锁失败, 不消费消息");
        }
    }

}