|
@@ -0,0 +1,609 @@
|
|
|
|
+package com.yonge.cooleshow.biz.dal.service.impl;
|
|
|
|
+
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
+import com.baomidou.mybatisplus.core.metadata.IPage;
|
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
|
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
|
|
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
|
+import com.yonge.cooleshow.auth.config.CustomerServiceConfig;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.config.RongCloudConfig;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.entity.CustomerServiceBatchSending;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.entity.CustomerServiceReceive;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.entity.Subject;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.entity.SysUser;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.entity.Teacher;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.enums.ClientEnum;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.enums.MK;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.enums.im.EImReceiveType;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.service.SubjectService;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.service.TeacherService;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
|
|
|
|
+import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
|
|
|
|
+import com.yonge.toolset.base.exception.BizException;
|
|
|
|
+import com.yonge.toolset.base.util.ImUtil;
|
|
|
|
+import com.yonge.toolset.base.util.ThreadPool;
|
|
|
|
+import com.yonge.toolset.mybatis.support.PageUtil;
|
|
|
|
+import com.yonge.toolset.payment.util.DistributedLock;
|
|
|
|
+import io.rong.messages.BaseMessage;
|
|
|
|
+import io.rong.messages.ImgMessage;
|
|
|
|
+import io.rong.messages.TxtMessage;
|
|
|
|
+import io.rong.models.message.PrivateMessage;
|
|
|
|
+import io.rong.models.message.PushExt;
|
|
|
|
+import io.rong.models.response.ResponseResult;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
|
+import org.joda.time.DateTime;
|
|
|
|
+import org.redisson.api.RedissonClient;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
|
+
|
|
|
|
+import java.text.MessageFormat;
|
|
|
|
+import java.util.Arrays;
|
|
|
|
+import java.util.Date;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Objects;
|
|
|
|
+import java.util.Optional;
|
|
|
|
+import java.util.Random;
|
|
|
|
+import java.util.function.Function;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+import java.util.stream.IntStream;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * 客服群发
|
|
|
|
+ * 2022-12-09 10:49:10
|
|
|
|
+ */
|
|
|
|
+@Slf4j
|
|
|
|
+@Service
|
|
|
|
+public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<CustomerServiceBatchSendingMapper, CustomerServiceBatchSending> implements CustomerServiceBatchSendingService {
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ private CustomerServiceReceiveService customerServiceReceiveService;
|
|
|
|
+ @Autowired
|
|
|
|
+ private SubjectService subjectService;
|
|
|
|
+ @Autowired
|
|
|
|
+ private CustomerServiceConfig customerServiceConfig;
|
|
|
|
+ @Autowired
|
|
|
|
+ private SysUserMapper sysUserMapper;
|
|
|
|
+ @Autowired
|
|
|
|
+ private RedissonClient redissonClient;
|
|
|
|
+ @Autowired
|
|
|
|
+ private TeacherService teacherService;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 查询详情
|
|
|
|
+ * @param id 详情ID
|
|
|
|
+ * @return CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending detail(Long id) {
|
|
|
|
+
|
|
|
|
+ CustomerServiceBatchSending sending = baseMapper.selectById(id);
|
|
|
|
+ if (Objects.isNull(sending)) {
|
|
|
|
+ throw new BizException("无效的请求ID");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending record = CustomerServiceBatchSendingWrapper
|
|
|
|
+ .CustomerServiceBatchSending.from(JSON.toJSONString(sending));
|
|
|
|
+
|
|
|
|
+ // 填充声部,目标群体
|
|
|
|
+ List<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> records = Lists.newArrayList(record);
|
|
|
|
+
|
|
|
|
+ // 返回消息封闭
|
|
|
|
+ getBatchSendingPaddingData(records);
|
|
|
|
+
|
|
|
|
+ // 部分推送,查询用户信息
|
|
|
|
+ if (EImReceiveType.PORTION == sending.getReceiveType()) {
|
|
|
|
+
|
|
|
|
+ CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
|
|
|
|
+ .builder().batchSendingId(sending.getId()).build();
|
|
|
|
+ IPage<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = customerServiceReceiveService.selectPage(PageUtil.getPage(1, 9999), receiveQuery);
|
|
|
|
+
|
|
|
|
+ record.setReceives(page.getRecords());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return record;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 分页查询
|
|
|
|
+ * @param page IPage<CustomerServiceBatchSending>
|
|
|
|
+ * @param query CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery
|
|
|
|
+ * @return IPage<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending>
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public IPage<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> selectPage(IPage<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> page,
|
|
|
|
+ CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery query){
|
|
|
|
+
|
|
|
|
+ // 群发消息记录
|
|
|
|
+ List<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> sendings = baseMapper.selectPage(page, query);
|
|
|
|
+
|
|
|
|
+ if (CollectionUtils.isNotEmpty(sendings)) {
|
|
|
|
+
|
|
|
|
+ // 返回消息封闭
|
|
|
|
+ getBatchSendingPaddingData(sendings);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return page.setRecords(sendings);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 群发消息返回封装
|
|
|
|
+ * @param sendings List<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending>
|
|
|
|
+ */
|
|
|
|
+ private void getBatchSendingPaddingData(List<CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending> sendings) {
|
|
|
|
+
|
|
|
|
+ // 声部信息
|
|
|
|
+ List<Long> subjectIds = sendings.stream()
|
|
|
|
+ .flatMap(x -> Arrays.stream(x.getSendSubject().split(",")))
|
|
|
|
+ .map(Long::parseLong).distinct().collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ Map<Long, String> subjectNameMap = subjectService.findBySubjectByIdList(subjectIds).stream()
|
|
|
|
+ .collect(Collectors.toMap(Subject::getId, Subject::getName, (o, n) -> n));
|
|
|
|
+
|
|
|
|
+ // 创建用户
|
|
|
|
+ List<Long> userIds = sendings.stream()
|
|
|
|
+ .flatMap(x -> Lists.newArrayList(x.getCreateBy(), x.getSenderId()).stream())
|
|
|
|
+ .filter(Objects::nonNull).distinct().collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ List<SysUser> sysUsers = sysUserMapper.selectBatchIds(userIds);
|
|
|
|
+
|
|
|
|
+ Map<Long, String> usernameMap = sysUsers.stream()
|
|
|
|
+ .collect(Collectors.toMap(SysUser::getId, SysUser::getUsername, (o, n) -> n));
|
|
|
|
+
|
|
|
|
+ Map<Long, SysUser> senderMap = sysUsers.stream()
|
|
|
|
+ .collect(Collectors.toMap(SysUser::getId, Function.identity(), (o, n) -> n));
|
|
|
|
+
|
|
|
|
+ // 发送群体,发送声部
|
|
|
|
+ SysUser sysUser;
|
|
|
|
+ for (CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending item : sendings) {
|
|
|
|
+
|
|
|
|
+ // 创建用户
|
|
|
|
+ item.sender("").mobile("")
|
|
|
|
+ .setCreateUser(usernameMap.getOrDefault(item.getCreateBy(), ""));
|
|
|
|
+
|
|
|
|
+ if (senderMap.containsKey(item.getSenderId())) {
|
|
|
|
+ sysUser = senderMap.get(item.getSenderId());
|
|
|
|
+
|
|
|
|
+ item.sender(sysUser.getUsername()).mobile(sysUser.getPhone());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ List<Long> collect = Arrays.stream(Optional.ofNullable(item.getSendSubject()).orElse("").split(","))
|
|
|
|
+ .map(Long::parseLong).distinct().collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ // 发送声部
|
|
|
|
+ String subjectName = subjectNameMap.entrySet().stream()
|
|
|
|
+ .filter(x -> collect.contains(x.getKey()))
|
|
|
|
+ .map(Map.Entry::getValue).collect(Collectors.joining(","));
|
|
|
|
+ item.setSubjectName(subjectName);
|
|
|
|
+
|
|
|
|
+ // 发送群体
|
|
|
|
+ List<String> clientTypes = Arrays.stream(Optional.ofNullable(item.getTargetGroup()).orElse("").split(","))
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ String targetGroup = Arrays.stream(ClientEnum.values())
|
|
|
|
+ .filter(x -> clientTypes.contains(x.getCode())).map(ClientEnum::getMsg)
|
|
|
|
+ .collect(Collectors.joining(","));
|
|
|
|
+ item.setTargetGroupName(targetGroup);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 添加
|
|
|
|
+ * @param info CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending
|
|
|
|
+ * @return Boolean
|
|
|
|
+ */
|
|
|
|
+ @Transactional
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean add(CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending info) {
|
|
|
|
+
|
|
|
|
+ // 发送客服ID
|
|
|
|
+ String customerService = info.getMobile();
|
|
|
|
+ if (StringUtils.isBlank(customerService)) {
|
|
|
|
+ customerService = customerServiceConfig.getCustomerService();
|
|
|
|
+ }
|
|
|
|
+ if (StringUtils.isNotEmpty(customerService)) {
|
|
|
|
+
|
|
|
|
+ List<String> collect = Arrays.stream(customerService.split(",")).collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ Random rand = new Random();
|
|
|
|
+ String mobile = collect.get(rand.nextInt(collect.size()));
|
|
|
|
+
|
|
|
|
+ // 查询发送客服ID
|
|
|
|
+ SysUser senderUser = sysUserMapper.selectOne(Wrappers.<SysUser>lambdaQuery()
|
|
|
|
+ .eq(SysUser::getPhone, mobile));
|
|
|
|
+ if (Objects.isNull(senderUser)) {
|
|
|
|
+ throw new BizException("未匹配到客服人员");
|
|
|
|
+ }
|
|
|
|
+ info.setSenderId(senderUser.getId());
|
|
|
|
+
|
|
|
|
+ Teacher teacher = teacherService.getOne(Wrappers.<Teacher>lambdaQuery()
|
|
|
|
+ .eq(Teacher::getUserId, info.getSenderId()));
|
|
|
|
+ if (Objects.isNull(teacher)) {
|
|
|
|
+ throw new BizException("当前帐号未注册老师帐号");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 推送消息
|
|
|
|
+ CustomerServiceBatchSending sending = JSON.parseObject(info.jsonString(), CustomerServiceBatchSending.class);
|
|
|
|
+
|
|
|
|
+ // 保存推送群发消息
|
|
|
|
+ save(sending);
|
|
|
|
+
|
|
|
|
+ // 部分推送用户
|
|
|
|
+ if (CollectionUtils.isNotEmpty(info.getReceives())) {
|
|
|
|
+ for (CustomerServiceReceiveWrapper.CustomerServiceReceive item : info.getReceives()) {
|
|
|
|
+ item.setBatchSendingId(sending.getId());
|
|
|
|
+ }
|
|
|
|
+ customerServiceReceiveService.saveBatch(JSON.parseArray(JSON.toJSONString(info.getReceives()),
|
|
|
|
+ CustomerServiceReceive.class));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 更新群发消息人数
|
|
|
|
+ updateBatchSendingSendNumber(sending);
|
|
|
|
+
|
|
|
|
+ // 发送推送消息
|
|
|
|
+ if (EImSendType.IMMEDIATELY == info.getSendType()) {
|
|
|
|
+ asyncBatchSendingMessage(sending);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 更新群发消息人数
|
|
|
|
+ * @param info CustomerServiceBatchSending
|
|
|
|
+ */
|
|
|
|
+ private void updateBatchSendingSendNumber(CustomerServiceBatchSending info) {
|
|
|
|
+
|
|
|
|
+ // 发送条件,接收人数
|
|
|
|
+ CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery query = CustomerServiceBatchSendingWrapper.CustomerServiceBatchSendingQuery
|
|
|
|
+ .builder()
|
|
|
|
+ .sendSubject(info.getSendSubject())
|
|
|
|
+ .targetGroup(info.getTargetGroup())
|
|
|
|
+ .build();
|
|
|
|
+ info.setCondition(query.jsonString()); // 发送条件
|
|
|
|
+
|
|
|
|
+ // 更新发送人数,接收人数
|
|
|
|
+ if (EImReceiveType.ALL == info.getReceiveType()) {
|
|
|
|
+
|
|
|
|
+ info.setSendNumber(0);
|
|
|
|
+ List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
|
|
|
|
+ // 条件匹配的老师、学生数量
|
|
|
|
+ query.getClientTypes().parallelStream().forEach(item -> {
|
|
|
|
+
|
|
|
|
+ CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
|
|
|
|
+ .clientType(ClientEnum.valueOf(item))
|
|
|
|
+ .subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
|
|
|
|
+ .build();
|
|
|
|
+
|
|
|
|
+ Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
|
|
|
|
+ // 统计当前匹配用户数量
|
|
|
|
+ getBaseMapper().selectMessageReceives(page, receiveQuery);
|
|
|
|
+
|
|
|
|
+ receiveNums.add((int) page.getTotal());
|
|
|
|
+ });
|
|
|
|
+ info.setSendNumber(receiveNums.stream().mapToInt(Integer::intValue).sum());
|
|
|
|
+ } else {
|
|
|
|
+ // 选择的老师、学生数量
|
|
|
|
+ int sendNumber = customerServiceReceiveService.count(Wrappers.<CustomerServiceReceive>lambdaQuery()
|
|
|
|
+ .eq(CustomerServiceReceive::getBatchSendingId, info.getId()));
|
|
|
|
+ info.setSendNumber(sendNumber);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 更新群发消息人数
|
|
|
|
+ lambdaUpdate()
|
|
|
|
+ .eq(CustomerServiceBatchSending::getId, info.getId())
|
|
|
|
+ .set(CustomerServiceBatchSending::getSendNumber, info.getSendNumber())
|
|
|
|
+ .set(CustomerServiceBatchSending::getCondition, info.getCondition())
|
|
|
|
+ .update();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 更新
|
|
|
|
+ * @param info CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending
|
|
|
|
+ * @return Boolean
|
|
|
|
+ */
|
|
|
|
+ @Transactional
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean update(CustomerServiceBatchSendingWrapper.CustomerServiceBatchSending info){
|
|
|
|
+
|
|
|
|
+ CustomerServiceBatchSending sending = JSON.parseObject(info.jsonString(),
|
|
|
|
+ CustomerServiceBatchSending.class);
|
|
|
|
+ // 更新群发消息
|
|
|
|
+ this.updateById(sending);
|
|
|
|
+
|
|
|
|
+ if (CollectionUtils.isNotEmpty(info.getReceives())) {
|
|
|
|
+
|
|
|
|
+ // 先清除数据,后重新插入
|
|
|
|
+ customerServiceReceiveService.remove(Wrappers.<CustomerServiceReceive>lambdaQuery()
|
|
|
|
+ .eq(CustomerServiceReceive::getBatchSendingId, sending.getId()));
|
|
|
|
+
|
|
|
|
+ for (CustomerServiceReceiveWrapper.CustomerServiceReceive item : info.getReceives()) {
|
|
|
|
+ item.setBatchSendingId(sending.getId());
|
|
|
|
+ }
|
|
|
|
+ customerServiceReceiveService.saveBatch(JSON.parseArray(JSON.toJSONString(info.getReceives()),
|
|
|
|
+ CustomerServiceReceive.class));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ sending = getById(info.getId());
|
|
|
|
+ // 更新群发消息人数
|
|
|
|
+ updateBatchSendingSendNumber(sending);
|
|
|
|
+
|
|
|
|
+ // 发送推送消息
|
|
|
|
+ if (EImSendType.IMMEDIATELY == sending.getSendType()) {
|
|
|
|
+ sendMessage(info.getId());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 更新消息状态
|
|
|
|
+ *
|
|
|
|
+ * @param id 消息Id
|
|
|
|
+ * @param sendStatus EImSendStatus
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void status(Long id, EImSendStatus sendStatus) {
|
|
|
|
+
|
|
|
|
+ CustomerServiceBatchSending record = getById(id);
|
|
|
|
+ if (Objects.isNull(record)) {
|
|
|
|
+ throw new BizException("无效的请求Id");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 校验消息发送状态
|
|
|
|
+ if (EImSendStatus.SEND == record.getSendStatus()) {
|
|
|
|
+ throw new BizException("当前消息已发送");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 当前群发消息状态一致
|
|
|
|
+ if (record.getSendStatus() == sendStatus) {
|
|
|
|
+ throw new BizException("群发消息状态一致");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 更新群发消息状态
|
|
|
|
+ lambdaUpdate()
|
|
|
|
+ .eq(CustomerServiceBatchSending::getId, id)
|
|
|
|
+ .set(CustomerServiceBatchSending::getSendStatus, sendStatus)
|
|
|
|
+ .update();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 发送消息
|
|
|
|
+ *
|
|
|
|
+ * @param id 群发消息ID
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void sendMessage(Long id) {
|
|
|
|
+
|
|
|
|
+ CustomerServiceBatchSending info = getById(id);
|
|
|
|
+ if (Objects.isNull(info)) {
|
|
|
|
+ throw new BizException("无效的群发消息ID");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 发送消息
|
|
|
|
+ asyncBatchSendingMessage(info);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 异步发送消息
|
|
|
|
+ * @param info CustomerServiceBatchSending
|
|
|
|
+ */
|
|
|
|
+ private void asyncBatchSendingMessage(CustomerServiceBatchSending info) {
|
|
|
|
+ String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
|
|
|
|
+ // 消息状态判定,且不能重复发送
|
|
|
|
+ DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> {
|
|
|
|
+
|
|
|
|
+ List<BaseMessage> messages = getReceiveMessage(info);
|
|
|
|
+
|
|
|
|
+ // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
|
|
|
|
+ ThreadPool.getExecutor().submit(() -> {
|
|
|
|
+ // 接收消息用户数
|
|
|
|
+ List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
|
|
|
|
+ // 分页数限制
|
|
|
|
+ int limit = 500;
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+
|
|
|
|
+ if (EImReceiveType.ALL == info.getReceiveType()) {
|
|
|
|
+
|
|
|
|
+ // 推送消息给匹配用户
|
|
|
|
+ List<String> targetGroupes = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ targetGroupes.parallelStream().forEach(clientType -> {
|
|
|
|
+
|
|
|
|
+ CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
|
|
|
|
+ .clientType(ClientEnum.valueOf(clientType))
|
|
|
|
+ .subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
|
|
|
|
+ .build();
|
|
|
|
+
|
|
|
|
+ Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
|
|
|
|
+ // 统计当前匹配用户数量
|
|
|
|
+ getBaseMapper().selectMessageReceives(page, receiveQuery);
|
|
|
|
+
|
|
|
|
+ // 计算总页数
|
|
|
|
+ int pages = (int) (page.getTotal() - 1) / limit + 1;
|
|
|
|
+
|
|
|
|
+ List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ for (Integer pageNum : collect) {
|
|
|
|
+
|
|
|
|
+ List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
|
|
|
|
+ .map(x -> String.valueOf(x.getUserId()))
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ if (ClientEnum.STUDENT.match(clientType)) {
|
|
|
|
+ receiveUserIds = receiveUserIds.stream()
|
|
|
|
+ .map(x -> MessageFormat.format("{0}:STUDENT", x))
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ batchSendCustomerServiceMessage(info, messages, receiveNums, receiveUserIds);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ } else {
|
|
|
|
+
|
|
|
|
+ // 查询条件
|
|
|
|
+ CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
|
|
|
|
+ .builder().batchSendingId(info.getId()).build();
|
|
|
|
+
|
|
|
|
+ Page<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = PageUtil.getPage(1, 10);
|
|
|
|
+ // 推送消息给指定用户
|
|
|
|
+ customerServiceReceiveService.selectPage(page, receiveQuery);
|
|
|
|
+
|
|
|
|
+ // 计算总页数
|
|
|
|
+ int pages = (int) (page.getTotal() - 1) / limit + 1;
|
|
|
|
+
|
|
|
|
+ List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ for (Integer pageNum : collect) {
|
|
|
|
+
|
|
|
|
+ List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
|
|
|
|
+ .map(x -> {
|
|
|
|
+ if (ClientEnum.STUDENT.match(x.getClientType())) {
|
|
|
|
+ return MessageFormat.format("{0}:STUDENT", String.valueOf(x.getUserId()));
|
|
|
|
+ }
|
|
|
|
+ return String.valueOf(x.getUserId());
|
|
|
|
+ })
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ batchSendCustomerServiceMessage(info, messages, receiveNums, receiveUserIds);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 更新消息状态为已发送
|
|
|
|
+ lambdaUpdate()
|
|
|
|
+ .eq(CustomerServiceBatchSending::getId, info.getId())
|
|
|
|
+ .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
|
|
|
|
+ .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
|
|
|
|
+ .set(CustomerServiceBatchSending::getSendTime, DateTime.now().toDate())
|
|
|
|
+ .update();
|
|
|
|
+
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("sendMessage id={}", info.getId(), e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 批量发送客服消息
|
|
|
|
+ * @param info CustomerServiceBatchSending
|
|
|
|
+ * @param messages 推送消息类型
|
|
|
|
+ * @param receiveNums 接收消息用户数
|
|
|
|
+ * @param receiveUserIds 接收消息用户Id
|
|
|
|
+ */
|
|
|
|
+ private static void batchSendCustomerServiceMessage(CustomerServiceBatchSending info, List<BaseMessage> messages,
|
|
|
|
+ List<Integer> receiveNums, List<String> receiveUserIds) {
|
|
|
|
+ // 拓展消息
|
|
|
|
+ PushExt pushExt = PushExt.build(info.getTitle(), 1,
|
|
|
|
+ new PushExt.HW("channelId", "NORMAL"), new PushExt.VIVO("1"),
|
|
|
|
+ new PushExt.APNs("", ""),
|
|
|
|
+ new PushExt.OPPO(""));
|
|
|
|
+
|
|
|
|
+ String senderId = String.valueOf(info.getSenderId());
|
|
|
|
+ PrivateMessage privateMessage;
|
|
|
|
+ ResponseResult privateResult;
|
|
|
|
+ for (List<String> item : Lists.partition(receiveUserIds, 100)) {
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+
|
|
|
|
+ for (BaseMessage message : messages) {
|
|
|
|
+
|
|
|
|
+ // 发送用户IM通知消息
|
|
|
|
+ privateMessage = new PrivateMessage()
|
|
|
|
+ .setSenderId(senderId)
|
|
|
|
+ .setTargetId(item.toArray(new String[0]))
|
|
|
|
+ .setObjectName(message.getType())
|
|
|
|
+ .setContent(message)
|
|
|
|
+ .setPushExt(pushExt)
|
|
|
|
+ .setIsIncludeSender(0);
|
|
|
|
+
|
|
|
|
+ privateResult = RongCloudConfig.rongCloud.message.msgPrivate.send(privateMessage);
|
|
|
|
+ log.info("batchSendCustomerServiceMessage senderId={}, ret={}", senderId, privateResult.getCode());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ receiveNums.add(item.size());
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("batchSendCustomerServiceMessage senderId={}", senderId, e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 推送消息类型
|
|
|
|
+ * @param info CustomerServiceBatchSending
|
|
|
|
+ * @return List<BaseMessage>
|
|
|
|
+ */
|
|
|
|
+ private static List<BaseMessage> getReceiveMessage(CustomerServiceBatchSending info) {
|
|
|
|
+ List<BaseMessage> messages = Lists.newArrayList();
|
|
|
|
+
|
|
|
|
+ if (StringUtils.isNotEmpty(info.getImgMessage())) {
|
|
|
|
+
|
|
|
|
+ //String suffix = info.getImgMessage().substring(info.getImgMessage().lastIndexOf("."));
|
|
|
|
+
|
|
|
|
+ // 发送图片消息
|
|
|
|
+ ImgMessage imgMessage = new ImgMessage(ImUtil.imageToBase64(info.getImgMessage(), "png"), "", info.getImgMessage());
|
|
|
|
+
|
|
|
|
+ messages.add(imgMessage);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (StringUtils.isNotEmpty(info.getTextMessage())) {
|
|
|
|
+
|
|
|
|
+ // 发送文本消息
|
|
|
|
+ TxtMessage txtMessage = new TxtMessage(info.getTextMessage(), "");
|
|
|
|
+
|
|
|
|
+ messages.add(txtMessage);
|
|
|
|
+ }
|
|
|
|
+ return messages;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 定时发送消息
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void scheduleSendMessage() {
|
|
|
|
+
|
|
|
|
+ DistributedLock.of(redissonClient).runIfLockCanGet("scheduleSendMessage:LOCK", () -> {
|
|
|
|
+
|
|
|
|
+ // 定时发送时间
|
|
|
|
+ // 开始时间
|
|
|
|
+ Date startTime = DateTime.now().minusMinutes(2).toDate();
|
|
|
|
+ // 结束时间
|
|
|
|
+ Date endTime = DateTime.now().toDate();
|
|
|
|
+
|
|
|
|
+ List<CustomerServiceBatchSending> batchSendings = lambdaQuery()
|
|
|
|
+ .between(CustomerServiceBatchSending::getSendTime, startTime, endTime)
|
|
|
|
+ .eq(CustomerServiceBatchSending::getSendType, EImSendType.SCHEDULED)
|
|
|
|
+ .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.WAIT)
|
|
|
|
+ .list();
|
|
|
|
+
|
|
|
|
+ if (CollectionUtils.isNotEmpty(batchSendings)) {
|
|
|
|
+ log.info("scheduleSendMessage time={}, size={}", DateTime.now().toString(MK.TIME_PATTERN), batchSendings.size());
|
|
|
|
+
|
|
|
|
+ for (List<CustomerServiceBatchSending> item : Lists.partition(batchSendings, 10)) {
|
|
|
|
+
|
|
|
|
+ item.parallelStream().forEach(x -> sendMessage(x.getId()));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+}
|