|
@@ -25,6 +25,7 @@ import com.yonge.cooleshow.biz.dal.enums.im.EImSendStatus;
|
|
|
import com.yonge.cooleshow.biz.dal.enums.im.EImSendType;
|
|
|
import com.yonge.cooleshow.biz.dal.mapper.CustomerServiceBatchSendingMapper;
|
|
|
import com.yonge.cooleshow.biz.dal.mapper.SysUserMapper;
|
|
|
+import com.yonge.cooleshow.biz.dal.redisson.RedissonMessageService;
|
|
|
import com.yonge.cooleshow.biz.dal.service.CustomerServiceBatchSendingService;
|
|
|
import com.yonge.cooleshow.biz.dal.service.CustomerServiceReceiveService;
|
|
|
import com.yonge.cooleshow.biz.dal.service.ImGroupService;
|
|
@@ -34,6 +35,7 @@ import com.yonge.cooleshow.biz.dal.service.TeacherService;
|
|
|
import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerService;
|
|
|
import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceBatchSendingWrapper;
|
|
|
import com.yonge.cooleshow.biz.dal.wrapper.im.CustomerServiceReceiveWrapper;
|
|
|
+import com.yonge.cooleshow.biz.dal.wrapper.liveroom.LiveRoomWrapper;
|
|
|
import com.yonge.cooleshow.common.constant.SysConfigConstant;
|
|
|
import com.yonge.toolset.base.exception.BizException;
|
|
|
import com.yonge.toolset.base.util.ImUtil;
|
|
@@ -48,11 +50,14 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.joda.time.DateTime;
|
|
|
+import org.redisson.api.RBucket;
|
|
|
+import org.redisson.api.RLock;
|
|
|
import org.redisson.api.RedissonClient;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
import java.text.MessageFormat;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
@@ -96,6 +101,8 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
@Autowired
|
|
|
private SysConfigService sysConfigService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private RedissonMessageService redissonMessageService;
|
|
|
/**
|
|
|
* 查询详情
|
|
|
* @param id 详情ID
|
|
@@ -161,6 +168,7 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
|
|
|
// 声部信息
|
|
|
List<Long> subjectIds = sendings.stream()
|
|
|
+ .filter(x -> StringUtils.isNotEmpty(x.getSendSubject()))
|
|
|
.flatMap(x -> Arrays.stream(x.getSendSubject().split(",")))
|
|
|
.map(Long::parseLong).distinct().collect(Collectors.toList());
|
|
|
|
|
@@ -195,6 +203,7 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
}
|
|
|
|
|
|
List<Long> collect = Arrays.stream(Optional.ofNullable(item.getSendSubject()).orElse("").split(","))
|
|
|
+ .filter(StringUtils::isNotBlank)
|
|
|
.map(Long::parseLong).distinct().collect(Collectors.toList());
|
|
|
|
|
|
// 发送声部
|
|
@@ -303,12 +312,40 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
info.setSendNumber(0);
|
|
|
List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
|
|
|
// 条件匹配的老师、学生数量
|
|
|
- query.getClientTypes().parallelStream().forEach(item -> {
|
|
|
+ Arrays.stream(query.getTargetGroup().split(",")).parallel().forEach(item -> {
|
|
|
+
|
|
|
+ // TEACHER: '平台老师',
|
|
|
+ // STUDENT: '平台学生',
|
|
|
+ // TENANT_TEACHER: '机构老师',
|
|
|
+ // TENANT_STUDENT: '机构学生',
|
|
|
+ boolean tenantFlag = false;
|
|
|
+ switch (item) {
|
|
|
+ case "TEACHER":
|
|
|
+ item = "TEACHER";
|
|
|
+ break;
|
|
|
+ case "STUDENT":
|
|
|
+ item = "STUDENT";
|
|
|
+ break;
|
|
|
+ case "TENANT_TEACHER":
|
|
|
+ item = "TEACHER";
|
|
|
+ tenantFlag = true;
|
|
|
+ break;
|
|
|
+ case "TENANT_STUDENT":
|
|
|
+ item = "STUDENT";
|
|
|
+ tenantFlag = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
|
|
|
- .clientType(ClientEnum.valueOf(item))
|
|
|
- .subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
|
|
|
- .build();
|
|
|
+ .clientType(ClientEnum.valueOf(item))
|
|
|
+ .tenantFlag(tenantFlag)
|
|
|
+ .delFlag(0)
|
|
|
+ .lockFlag(0)
|
|
|
+ //.subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
|
|
|
+ .build();
|
|
|
+ if (StringUtils.isNotEmpty(info.getSendSubject())) {
|
|
|
+ receiveQuery.setSubjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()));
|
|
|
+ }
|
|
|
|
|
|
Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
|
|
|
// 统计当前匹配用户数量
|
|
@@ -425,130 +462,209 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
* @param info CustomerServiceBatchSending
|
|
|
*/
|
|
|
private void asyncBatchSendingMessage(CustomerServiceBatchSending info) {
|
|
|
- String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
|
|
|
- // 消息状态判定,且不能重复发送
|
|
|
- DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> {
|
|
|
-
|
|
|
- // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
|
|
|
- ThreadPool.getExecutor().submit(() -> {
|
|
|
- // 接收消息用户数
|
|
|
- List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
|
|
|
- // 分页数限制
|
|
|
- int limit = 500;
|
|
|
-
|
|
|
- try {
|
|
|
-
|
|
|
- // 消息发送数量,默认只发送1类消息(文字或图片)
|
|
|
- int messageNum = 1;
|
|
|
- if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) {
|
|
|
- // 同时发送图片和文字消息
|
|
|
- messageNum += 1;
|
|
|
- }
|
|
|
|
|
|
- int messageSendLimit; // 默认发送10000/分钟
|
|
|
- // 群发消息频率限制
|
|
|
- SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT);
|
|
|
- // 群发消息频率限制不能超过10000/分钟,超过当前值默认取值为10000
|
|
|
- if (config != null && config.getParamValue().matches("\\d+")) {
|
|
|
- int sendLimit = Integer.parseInt(config.getParamValue());
|
|
|
- if (sendLimit > 0 && sendLimit < 10000) {
|
|
|
- // 消息发送频率限制
|
|
|
- messageSendLimit = Integer.parseInt(config.getParamValue());
|
|
|
- } else {
|
|
|
- // 默认发送10000/分钟
|
|
|
- messageSendLimit = 10000;
|
|
|
- }
|
|
|
- } else {
|
|
|
- // 默认发送10000/分钟
|
|
|
- messageSendLimit = 10000;
|
|
|
- }
|
|
|
+ // 改状态为发送中,标记为发送中但是未发送
|
|
|
+ if (info.getId() == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- int finalMessageNum = messageNum;
|
|
|
- if (EImReceiveType.ALL == info.getReceiveType()) {
|
|
|
+ CustomerServiceBatchSending byId = getById(info.getId());
|
|
|
+ if (byId == null) {
|
|
|
+ return;
|
|
|
+ }else if (byId.getSendStatus() == EImSendStatus.SEND) {
|
|
|
+ return;
|
|
|
+ } else if (byId.getSendStatus() == EImSendStatus.SENDING && byId.getSendFlag()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // 推送消息给匹配用户
|
|
|
- List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
|
|
|
+ CustomerServiceBatchSending customerServiceBatchSending = new CustomerServiceBatchSending();
|
|
|
+ customerServiceBatchSending.setId(info.getId());
|
|
|
+ customerServiceBatchSending.setSendFlag(false);
|
|
|
+ customerServiceBatchSending.setSendTime(new Date());
|
|
|
+ customerServiceBatchSending.setSendStatus(EImSendStatus.SENDING);
|
|
|
+ updateById(customerServiceBatchSending);
|
|
|
+
|
|
|
+ // 如果有发送中,不再发送
|
|
|
+ Integer count = lambdaQuery()
|
|
|
+ .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SENDING)
|
|
|
+ .eq(CustomerServiceBatchSending::getSendFlag, 1)
|
|
|
+ .count();
|
|
|
+ if (count > 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- targetGroups.forEach(clientType -> {
|
|
|
+ // 异步发送消息且同步更新已发送人数,稍后可在页面刷新查看已发送用户数
|
|
|
+ ThreadPool.getExecutor().submit(() -> {
|
|
|
|
|
|
- CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
|
|
|
- .clientType(ClientEnum.valueOf(clientType))
|
|
|
- .subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
|
|
|
- .build();
|
|
|
+ // 加锁, 同时只能有一个在发送
|
|
|
|
|
|
- Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
|
|
|
- // 统计当前匹配用户数量
|
|
|
- getBaseMapper().selectMessageReceives(page, receiveQuery);
|
|
|
+ String lockName = MessageFormat.format("batchSending:{0}", String.valueOf(info.getId()));
|
|
|
+ // 消息状态判定,且不能重复发送
|
|
|
|
|
|
- // 计算总页数
|
|
|
- int pages = (int) (page.getTotal() - 1) / limit + 1;
|
|
|
|
|
|
- List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
|
|
|
+ DistributedLock.of(redissonClient).runIfLockCanGet(lockName, () -> sendMessage(info), 30L, TimeUnit.MINUTES);
|
|
|
|
|
|
- // 线程休眠计算器
|
|
|
- int sleepCount = 0;
|
|
|
- for (Integer pageNum : collect) {
|
|
|
|
|
|
- List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
|
|
|
- .map(x -> String.valueOf(x.getUserId()))
|
|
|
- .collect(Collectors.toList());
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
- receiveUserIds = receiveUserIds.stream()
|
|
|
- .map(x -> imGroupService.getImUserId(x, clientType))
|
|
|
- .collect(Collectors.toList());
|
|
|
+ private void sendMessage(CustomerServiceBatchSending info) {
|
|
|
|
|
|
- // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
|
|
|
- sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
|
|
|
|
|
|
- batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
|
|
|
- }
|
|
|
- });
|
|
|
+ // 接收消息用户数
|
|
|
+ List<Integer> receiveNums = Lists.newCopyOnWriteArrayList();
|
|
|
+ // 分页数限制
|
|
|
+ int limit = 500;
|
|
|
|
|
|
- } else {
|
|
|
+ try {
|
|
|
|
|
|
- // 查询条件
|
|
|
- CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
|
|
|
- .builder().batchSendingId(info.getId()).build();
|
|
|
+ lambdaUpdate()
|
|
|
+ .eq(CustomerServiceBatchSending::getId, info.getId())
|
|
|
+ .set(CustomerServiceBatchSending::getSendFlag, 1)
|
|
|
+ .update();
|
|
|
|
|
|
- Page<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = PageUtil.getPage(1, 10);
|
|
|
- // 推送消息给指定用户
|
|
|
- customerServiceReceiveService.selectPage(page, receiveQuery);
|
|
|
+ // 消息发送数量,默认只发送1类消息(文字或图片)
|
|
|
+ int messageNum = 1;
|
|
|
+ if (StringUtils.isNoneBlank(info.getTextMessage(), info.getImgMessage())) {
|
|
|
+ // 同时发送图片和文字消息
|
|
|
+ messageNum += 1;
|
|
|
+ }
|
|
|
|
|
|
- // 计算总页数
|
|
|
- int pages = (int) (page.getTotal() - 1) / limit + 1;
|
|
|
+ int messageSendLimit; // 默认发送10000/分钟
|
|
|
+ // 群发消息频率限制
|
|
|
+ SysConfig config = sysConfigService.findByParamName(SysConfigConstant.MESSAGE_SEND_LIMIT);
|
|
|
+ // 群发消息频率限制不能超过10000/分钟,超过当前值默认取值为10000
|
|
|
+ if (config != null && config.getParamValue().matches("\\d+")) {
|
|
|
+ int sendLimit = Integer.parseInt(config.getParamValue());
|
|
|
+ if (sendLimit > 0 && sendLimit < 10000) {
|
|
|
+ // 消息发送频率限制
|
|
|
+ messageSendLimit = Integer.parseInt(config.getParamValue());
|
|
|
+ } else {
|
|
|
+ // 默认发送10000/分钟
|
|
|
+ messageSendLimit = 10000;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 默认发送10000/分钟
|
|
|
+ messageSendLimit = 10000;
|
|
|
+ }
|
|
|
|
|
|
- List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
|
|
|
+ int finalMessageNum = messageNum;
|
|
|
+ if (EImReceiveType.ALL == info.getReceiveType()) {
|
|
|
+
|
|
|
+ // 推送消息给匹配用户
|
|
|
+ List<String> targetGroups = Arrays.stream(info.getTargetGroup().split(",")).collect(Collectors.toList());
|
|
|
+
|
|
|
+ targetGroups.forEach(clientType -> {
|
|
|
+ boolean tenantFlag = false;
|
|
|
+ switch (clientType) {
|
|
|
+ case "TEACHER":
|
|
|
+ clientType = "TEACHER";
|
|
|
+ break;
|
|
|
+ case "STUDENT":
|
|
|
+ clientType = "STUDENT";
|
|
|
+ break;
|
|
|
+ case "TENANT_TEACHER":
|
|
|
+ clientType = "TEACHER";
|
|
|
+ tenantFlag = true;
|
|
|
+ break;
|
|
|
+ case "TENANT_STUDENT":
|
|
|
+ clientType = "STUDENT";
|
|
|
+ tenantFlag = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
- // 线程休眠计算器
|
|
|
- int sleepCount = 0;
|
|
|
- for (Integer pageNum : collect) {
|
|
|
+ CustomerService.NotifyMessage receiveQuery = CustomerService.NotifyMessage.builder()
|
|
|
+ .clientType(ClientEnum.valueOf(clientType))
|
|
|
+ .tenantFlag(tenantFlag)
|
|
|
+ .lockFlag(0)
|
|
|
+ .delFlag(0)
|
|
|
+ //.subjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()))
|
|
|
+ .build();
|
|
|
+ if (StringUtils.isNotBlank(info.getSendSubject())) {
|
|
|
+ receiveQuery.setSubjectIds(Arrays.stream(info.getSendSubject().split(",")).collect(Collectors.toList()));
|
|
|
+ }
|
|
|
|
|
|
- List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
|
|
|
- .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
|
|
|
- .collect(Collectors.toList());
|
|
|
+ Page<CustomerService.MessageReceives> page = PageUtil.getPage(1, 10);
|
|
|
+ // 统计当前匹配用户数量
|
|
|
+ getBaseMapper().selectMessageReceives(page, receiveQuery);
|
|
|
|
|
|
- // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
|
|
|
- sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
|
|
|
+ // 计算总页数
|
|
|
+ int pages = (int) (page.getTotal() - 1) / limit + 1;
|
|
|
|
|
|
- batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
|
|
|
- }
|
|
|
+ List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
|
|
|
+
|
|
|
+ // 线程休眠计算器
|
|
|
+ int sleepCount = 0;
|
|
|
+ for (Integer pageNum : collect) {
|
|
|
+
|
|
|
+ List<String> receiveUserIds = getBaseMapper().selectMessageReceives(PageUtil.getPage(pageNum, limit), receiveQuery).stream()
|
|
|
+ .map(x -> String.valueOf(x.getUserId()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ String finalClientType = clientType;
|
|
|
+ receiveUserIds = receiveUserIds.stream()
|
|
|
+ .map(x -> imGroupService.getImUserId(x, finalClientType))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
|
|
|
+ sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
|
|
|
+
|
|
|
+ batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
|
|
|
}
|
|
|
+ });
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ // 查询条件
|
|
|
+ CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery receiveQuery = CustomerServiceReceiveWrapper.CustomerServiceReceiveQuery
|
|
|
+ .builder().batchSendingId(info.getId()).build();
|
|
|
+
|
|
|
+ Page<CustomerServiceReceiveWrapper.CustomerServiceReceive> page = PageUtil.getPage(1, 10);
|
|
|
+ // 推送消息给指定用户
|
|
|
+ customerServiceReceiveService.selectPage(page, receiveQuery);
|
|
|
+
|
|
|
+ // 计算总页数
|
|
|
+ int pages = (int) (page.getTotal() - 1) / limit + 1;
|
|
|
|
|
|
- // 更新消息状态为已发送
|
|
|
- lambdaUpdate()
|
|
|
- .eq(CustomerServiceBatchSending::getId, info.getId())
|
|
|
- .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
|
|
|
- .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
|
|
|
- .set(CustomerServiceBatchSending::getSendTime, DateTime.now().toDate())
|
|
|
- .update();
|
|
|
+ List<Integer> collect = IntStream.iterate(1, i -> i + 1).limit(pages).boxed().collect(Collectors.toList());
|
|
|
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("sendMessage id={}", info.getId(), e);
|
|
|
+ // 线程休眠计算器
|
|
|
+ int sleepCount = 0;
|
|
|
+ for (Integer pageNum : collect) {
|
|
|
+
|
|
|
+ List<String> receiveUserIds = customerServiceReceiveService.selectPage(PageUtil.getPage(pageNum, limit), receiveQuery).getRecords().stream()
|
|
|
+ .map(x -> imGroupService.getImUserId(String.valueOf(x.getUserId()), x.getClientType()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // 累加线程休眠计算器,若大于指定阀值时,线程休眠1分钟
|
|
|
+ sleepCount = messageSendSleepCondition(info, sleepCount, receiveUserIds.size(), finalMessageNum, messageSendLimit);
|
|
|
+
|
|
|
+ batchSendCustomerServiceMessage(info, receiveNums, receiveUserIds);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- });
|
|
|
|
|
|
- }, 30L, TimeUnit.MINUTES);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("sendMessage id={}", info.getId(), e);
|
|
|
+ } finally {
|
|
|
+ // 更新消息状态为已发送
|
|
|
+ lambdaUpdate()
|
|
|
+ .eq(CustomerServiceBatchSending::getId, info.getId())
|
|
|
+ .set(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SEND)
|
|
|
+ .set(CustomerServiceBatchSending::getReceiveNumber, receiveNums.stream().mapToInt(Integer::intValue).sum())
|
|
|
+ .set(CustomerServiceBatchSending::getSendEndTime, DateTime.now().toDate())
|
|
|
+ .update();
|
|
|
+ }
|
|
|
+
|
|
|
+ CustomerServiceBatchSending one = lambdaQuery()
|
|
|
+ .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.SENDING)
|
|
|
+ .eq(CustomerServiceBatchSending::getSendFlag, 0)
|
|
|
+ .orderByAsc(CustomerServiceBatchSending::getId)
|
|
|
+ .last("limit 1")
|
|
|
+ .one();
|
|
|
+ if (one !=null) {
|
|
|
+ sendMessage(one);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -736,9 +852,10 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
Date endTime = DateTime.now().toDate();
|
|
|
|
|
|
List<CustomerServiceBatchSending> batchSendings = lambdaQuery()
|
|
|
- .between(CustomerServiceBatchSending::getSendTime, startTime, endTime)
|
|
|
- .eq(CustomerServiceBatchSending::getSendType, EImSendType.SCHEDULED)
|
|
|
- .eq(CustomerServiceBatchSending::getSendStatus, EImSendStatus.WAIT)
|
|
|
+ .lt(CustomerServiceBatchSending::getSendTime, endTime)
|
|
|
+// .eq(CustomerServiceBatchSending::getSendType, EImSendType.SCHEDULED)
|
|
|
+ .in(CustomerServiceBatchSending::getSendStatus, EImSendStatus.WAIT,EImSendStatus.SENDING)
|
|
|
+ .eq(CustomerServiceBatchSending::getSendFlag,0)
|
|
|
.list();
|
|
|
|
|
|
if (CollectionUtils.isNotEmpty(batchSendings)) {
|
|
@@ -746,7 +863,7 @@ public class CustomerServiceBatchSendingServiceImpl extends ServiceImpl<Customer
|
|
|
|
|
|
for (List<CustomerServiceBatchSending> item : Lists.partition(batchSendings, 10)) {
|
|
|
|
|
|
- item.parallelStream().forEach(x -> sendMessage(x.getId()));
|
|
|
+ item.forEach(x -> sendMessage(x.getId()));
|
|
|
}
|
|
|
|
|
|
}
|