业务场景我们每天要对最近三个月的活跃用户进行批量营销和账单逾期计算。用户数据约800万。我们的方案是发送一条CUSTOMER_DAILY消息,然后订阅这条消息发送批量营销、账单逾期等业务消息。目前发送CUSTOMER_DAILY消息大约需要五个小时。不要担心当前的时刻。大家不用担心为什么效率这么低……因为系统在逐步优化,肯定是之前的代码存在一些问题。可能再过一段时间,我们自己的代码就会出现很多问题,这很正常。下面简单贴出目前我适当修改过的一些实现逻辑代码,分析一下目前方案中存在的问题。目前的解决方案目前是定时任务触发,线程池提交任务。代码如下(行尾的注释是我写的示例值)://MaximumactiveidlongmaxId=customerService.findMaxActiveId(beginTime);//20001//最小活动idlongminId=customerService.findMinActiveId(beginTime);//1//最大可能查询到的id总数longlistSize=maxId-minId;//20000//开启的线程数intrunSize=4;//平均每查询次数longcount=listSize/runSize;//5000//创建线程池,核心线程数与开启线程数相同ExecutorServiceexecutor=CreateThreadUtil.createThread(runSize);for(inti=0;i<=runSize;i++){//计算sql语句中每次分页查询的起止数据下标longmin=minId+i*count;//1,5001,10001,15001longmax=min+count;//5001,10001,15001,20001executor.execute(()->{Listcustomers=customerService.findByXxx(beginTime,min,max);customers.forEach(c->{Messagemessage=Message.build();//省略构造消息体messageService.save(message);applicationEventPublisher.publishEvent(newMessageSendEvent(message));});});}你不需要关心事务的细节,因为这是我为了减少代码而简化的。现在的逻辑可以是监听这个事件然后修改数据库消息发送的状态,发送一条消息@TransactionalEventListener(phase=TransactionPhase.AFTER_COMMIT)@Asyncpublicvoidlisten(MessageSendEventevent){//修改数据库消息的状态sendingto:sending//执行发送消息的APISDK//修改数据库消息发送状态为:success}存在的问题线程池资源可能没有被充分利用仔细观察第一段代码的逻辑,计算总数活跃用户的数量通过查询最大和最小的活跃用户id,这不是一个确切的数字。可能存在maxId=20001,minId=12~5000为非活跃用户,5001~20000为活跃用户。这样,for循环中负责1-5001的线程实际上只有一个用户任务需要处理,也就是说,一共有4个线程,一个线程在0.5秒内执行完任务,剩下的三个线程可能需要十多分钟。这样就浪费了第一个线程的资源。看过我之前的文章在学习CompletableFuture进阶之前掌握两个线程池的人都知道ForkJoinPool有一个任务窃取机制可以解决这个问题。循环访问数据库因为我们在发送消息的时候需要存储记录,所以在发送过程中需要修改两次状态(第一次发送,第二次发送成功),也就是说一条消息会有3次数据库IO操作,这是大量循环下最大的性能瓶颈。我们可以估计一个阈值并在该阈值处批处理一组消息,以便使用batchInsert仅访问一次数据库。调用SDK的API循环发送消息其实和循环访问数据库是一样的。普通消息队列具有批量发送消息的功能,而不是只为一条消息调用SDK的API。只有一台机器执行任务。当前定时任务为xxl-job,路由策略为第一个。也就是说,只有一个实例会在N个服务实例上执行。这相当于有N-1个实例空闲用于此作业。我们可以让N台机器一起做这个,xxl-job的shardbroadcast就可以满足。综上所述,xxl-jobshardbroadcastForkJoinPool批量访问数据库,批量发送消息手动优化xxl-jobshardbroadcastshardbroadcast的意思是触发对应集群的所有机器执行一个任务,系统自动传输shard参数;简单的说就是每台机器都会触发任务,每台机器都会收到不同的参数。我们可以根据这些不同的参数分配不同的应用实例来处理不同的数据。具体操作很简单,在xxl-job管理页面编辑任务路由策略即可分片广播。intshardIndex=XxlJobHelper.getShardIndex();//当前分片0/1/2/3intshardTotal=XxlJobHelper.getShardTotal();//总分片4ListcustomerIds=customerService.findIdsByShard(request);//根据findIdsByShard查询shard待处理数据的实现其实就是一个很简单的SQLSELECTIDFROMt_customerWHEREMOD(ID,${request.shardTotal})=${request.shardIndex}我们把ID和总数进行比较ofshards4对于余数,对于一个数字到4,0,1,2,3只有四个结果。这样,一条SQL在所有机器上执行的数据结果,就可以划分出要执行的总数据。ForkJoinPool假设上面每个应用实例获取到的customerId数量为200万,所以我们现在使用ForkJoinPool来分治这200万数据。首先定义任务类publicclassCustomerDailyTaskextendsRecursiveTask{privatefinalListcustomerIds;//客户id集合privatefinalCustomerServicecustomerService;publicstaticfinalintTHRESHOLD=1000;实验效率最高的阈值,我试了十几个值,1000个最合适)//省略构造方法@OverrideprotectedIntegercompute(){if(customerIds.size()<=THRESHOLD){返回customerService.sendDailyMessage(customerIds);}intgroupSize=(int)Math.ceil(customerIds.size()*1.0/2);//分成两半Lis??t>partition=Lists.partition(customerIds,groupSize);CustomerDailyTasktask1=newCustomerDailyTask(partition.get(0),customerService);CustomerDailyTasktask2=newCustomerDailyTask(partition.get(1),customerService);invokeAll(任务1,任务2);返回task1.join()+task2.join();}}初始化ForkJoinPool执行CustomerDailyTasktask=newCustomerDailyTask(customerIds,customerService);intcore=Runtime.getRuntime().availableProcessors();ForkJoinPoolpool=newForkJoinPool(core-1);//留一个线程池.invoke(task);批量访问数据库这里其实很简单,returncustomerService.sendDailyMessage(下面一行代码中的customerIds);根据传入的customerIds构造一个List并使用batchInsert方法插入到数据库中,然后发送一个Spring本地事件Spring事件发布,然后在事件监听器中batchUpdate更新状态,这里不再赘述批量发送消息非常简单。可以直接调用SDK进行批量发送。由于我们使用的是AWSSNS-SQS,所以SDK版本比较低。我也升级了SDK版本。。。这个是个小问题,很蛋疼最重要的是SDK支持一次最多发送10条消息,我无语了。。。我很迷茫,如果有没办法,那就拆了//SNS限制一次最多发送10条消息List>split=Lists.partition(list,10);List>future=split.stream().map(item->senderFactory.batchSend(item)).collect(Collectors.toList());Listresponse=CompletableFutureUtil.allOfCompleted(future);//...更新数据库消息状态为SUCCESS这里我们使用CompletableFuture异步批量发送消息。其实内部使用的线程池默认也是ForkJoinPool,allOfCompleted()的实现如下.allOf(list.toArray(newCompletableFuture[list.size()]));列表结果=列表。stream().map(CompletableFuture::join).collect(Collectors.toList());future.thenApply(v->列表);//阻塞主线程,执行所有异步任务返回结果;}外面是ForkJoinPool对数据进行拆分,每个拆分的子单元是一个ForkJoinPool来发送消息。。。我觉得如果机器配置很高,这个设计方案只有两个字NB!笔者请大家看看,目前我们公司在使用消息队列时,需要向数据库发送持久化消息。首先在当前事务中插入一条消息发送记录,记录状态为CREATED,利用Spring事件机制实现当前业务方法的事务提交。执行消息发送API后,状态记录为PENDING,发送成功后状态记录为SUCCESS,否则为FAILED。这样做的好处是可以100%保证消息丢失可追溯,每条发送的消息都有记录。而且后面的消息重试或者重发也很方便,因为我这里记录了消息体。但是我觉得emmm访问数据库的IO操作有点浪费。面对高并发的业务,比如秒杀系统,感觉这样的实现是不行的,因为数据库面临巨大的性能瓶颈。问了很多朋友他们公司是怎么做的,一半是入库记录,一半是非入库记录。。。想问问这里的网友们,你们公司是怎么做到的,请留言请教,谢谢!结语本文分析了一个大规模任务优化方案,从集群实例共享工作、合理使用线程资源、任务划分、减少数据库IO、减少API调用等方面逐步优化出一个目前比较合适的处理方案。