当前位置: 首页 > 后端技术 > Java

多线程永续任务的设计与实现

时间:2023-04-02 09:13:22 Java

今天教大家一个Java多线程永续任务。本例原型为公司自行开发的多线程异步任务项目。我把多线程相关的代码都提取出来做了一定的改造。里面涉及的知识点非常多,特别适合有一定工作经验的同学学习,也可以直接在项目中使用。文章结构很简单:1.功能描述做这个多线程异步任务的主要原因是我们有很多异步任务,总是在动。什么是永远?即任务运行后,需要一直运行。比如消息推送任务需要一直消费DB中未推送的消息,因为一直有消息进来,所以需要一个整体的推送永久异步任务。我们的要求其实并不难。简单总结一下:它可以同时执行多个永续异步任务;每个异步任务支持开启多个线程来消费本任务的数据;支持perpetual异步任务的优雅关闭,即关闭最后需要消费完所有数据再关闭。要完成以上需求,需要注意几点:每个perpetualtask可以开启一个线程执行;每个子任务都需要一个线程池来控制,因为它需要支持并发;永久任务的关闭需要通知子任务并发线程,支持永久任务和并发子任务的优雅关闭。2.多线程任务示例2.1线程池对于子任务,需要支持并发。如果每个线程并发打开,用完了就关闭,太消耗资源了,所以引入了线程池:publicclassTaskProcessUtil{//每个任务都有自己的线程池privatestaticMapexecutors=newConcurrentHashMap<>();//初始化一个线程池privatestaticExecutorServiceinit(StringpoolName,intpoolSize){returnnewThreadPoolExecutor(poolSize,poolSize,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue(),newThreadFactoryBuilder().setNameFormat("Pool-"+poolName).setDaemon(false).build(),newThreadPoolExecutor.CallerRunsPolicy());/获取线程池publicstaticExecutorServicegetOrInitExecutors(StringpoolName,intpoolSize){ExecutorServiceexecutorService=executors.get(poolName);if(null==executorService){同步(TaskProcessUtil.class){executorService=executors.get(poolName);我f(null==executorService){executorService=init(poolName,poolSize);executors.put(poolName,executorService);}}}返回执行服务;}//回收线程资源publicstaticvoidreleaseExecutors(StringpoolName){ExecutorServiceexecutorService=executors.remove(poolName);if(executorService!=null){executorService.shutdown();}}}这是一个线程池的工具类。这里初始化线程池和回收线程资源非常简单。我们主要讨论如何获取线程池获取线程池可能存在并发,所以需要加同步锁,加锁后需要对executorService进行二次null检查。2.2单任务为了更好的说明单任务的实现,我们的主要任务是打印出Cat的数据。Cat的定义如下:@Data@ServicepublicclassCat{privateStringcatName;publicCatsetCatName(字符串名称){这个。猫名=名字;归还这个;}}单个任务主要包括以下功能:获取永久任务数据:这里一般是扫描DB,我直接用queryData()代替。多线程任务执行:需要将数据拆分成4份,然后由多个线程并发执行,这里的线程池可以支持;perpetualtaskgracefulshutdown:当任务被外界通知需要停止任务时,需要执行剩余的任务数据,并回收线程资源并退出任务;永久执行:如果没有收到关机命令,任务需要永远执行。直接看代码:publicclassChildTask{privatefinalintPOOL_SIZE=3;//线程池大小privatefinalintSPLIT_SIZE=4;//数据拆分大小privateStringtaskName;//接收jvm关闭信号实现优雅关闭protectedvolatilebooleanterminal=false;publicChildTask(StringtaskName){this.taskName=taskName;}//程序执行入口publicvoiddoExecute(){inti=0;while(true){System.out.println(taskName+":Cycle-"+i+"-Begin");//获取数据Listdatas=queryData();//处理数据taskExecute(datas);System.out.println(taskName+":Cycle-"+i+"-End");if(terminal){//只有当应用关闭时,才会来到这里实现优雅的下线休息;我++;}//回收线程池资源TaskProcessUtil.releaseExecutors(taskName);}//正常关闭publicvoidterminal(){//关闭terminal=true;系统tem.out.println(taskName+"关机");}//处理数据privatevoiddoProcessData(Listdatas,CountDownLatchlatch){try{for(Catcat:datas){System.out.println(taskName+":"+cat.toString()+",ThreadName:"+Thread.currentThread().getName());线程.睡眠(1000L);}}catch(Exceptione){System.out.println(e.getStackTrace());}finally{if(latch!=null){latch.countDown();}}}//处理单任务数据privatevoidtaskExecute(ListsourceDatas){if(CollectionUtils.isEmpty(sourceDatas)){return;}//将数据分成4部分List>splitDatas=Lists.partition(sourceDatas,SPLIT_SIZE);finalCountDownLatchlatch=newCountDownLatch(splitDatas.size());//并发处理拆分后的数据共享一个线程池for(finalListdatas:splitDatas){ExecutorServiceexecutorService=TaskProcessUtil.getOrInitExecutors(taskName,POOL_SIZE);}executorService.submit(newRunnable(){@Overridepublicvoidrun(){doProcessData(datas,latch);}});}尝试{latch.await();}catch(Exceptione){System.out.println(e.getStackTrace());}}//获取永久任务数据privateListqueryData(){Listdatas=newArrayList<>();for(inti=0;i<5;i++){datas.add(newCat().setCatName("罗小黑"+i));}返回数据;}}简要说明:queryData:用于获取数据,在实际应用中,需要将queryData设置为一个抽象方法,然后每个task实现自己的方法doProcessData:数据处理逻辑,在实际应用中,其实是需要将doProcessData设置为抽象方法,然后每个任务实现自己的方法。taskExecute:将数据拆分成4份,获取task的线程池,交给线程池并发执行,然后通过latch.await()阻塞。当4条数据全部执行成功后,阻塞结束,方法返回。终端:仅用于接受关机命令。这里变量定义为volatile,所以多线程内存是可见的;doExecute:程序执行入口,封装了每个任务执行的过程。当terminal=true时,先执行任务数据,然后回收线程池,最后退出。2.3直接在任务入口添加代码:publicclassLoopTask{privateListchildTasks;publicvoidinitLoopTask(){childTasks=newArrayList();childTasks.add(newChildTask("childTask1"));childTasks.add(newChildTask("childTask2"));for(finalChildTaskchildTask:childTasks){newThread(newRunnable(){@Overridepublicvoidrun(){childTask.doExecute();}}).start();}}publicvoidshutdownLoopTask(){if(!CollectionUtils.isEmpty(childTasks)){for(ChildTaskchildTask:childTasks){childTask.terminal();}}}publicstaticvoidmain(Stringargs[])throwsException{LoopTaskloopTask=newLoopTask();loopTask.initLoopTask();线程.睡眠(5000L);loopTask.shutdownLoopTask();}}每个task开启一个单独的Thread,这里我初始化了2分别执行两个perpetualtask,即childTask1和childTask2,在Sleep5秒后,关闭task,看看能否如我们预期的那样优雅退出2.4结果分析执行结果如下:childTask1:Cycle-0-BeginchildTask2:Cycle-0-BeginchildTask1:Cat(catName=罗小黑0),ThreadName:Pool-childTask1childTask1:Cat(catName=罗小黑4),ThreadName:Pool-childTask1childTask2:Cat(catName=罗小黑4),ThreadName:Pool-childTask2childTask2:Cat(catName=罗小黑0),ThreadName:Pool-childTask2childTask1:Cat(catName=罗小黑1),ThreadName:Pool-childTask1childTask2:Cat(catName=罗小黑1),ThreadName:Pool-childTask2childTask2:Cat(catName=罗小黑2),ThreadName:Pool-childTask2childTask1:Cat(catName=罗小黑2),ThreadName:Pool-childTask1childTask2:Cat(catName=罗小黑3),ThreadName:Pool-childTask2childTask1:Cat(catName=罗小黑3),ThreadName:Pool-childTask1childTask2:Cycle-0-EndchildTask2:Cycle-1-BeginchildTask1:Cycle-0-EndchildTask1:Cycle-1-BeginchildTask2:Cat(catName=罗小黑0),ThreadName:Pool-childTask2childTask2:Cat(catName=罗小黑4),ThreadName:Pool-childTask2childTask1:Cat(catName=罗小黑4),ThreadName:Pool-childTask1childTask1:Cat(catName=罗小黑0),ThreadName:Pool-childTask1childTask1关闭childTask2关闭downchildTask2:Cat(catName=罗小黑1),ThreadName:Pool-childTask2childTask1:Cat(catName=罗小黑1),ThreadName:Pool-childTask1childTask1:Cat(catName=罗小黑2),ThreadName:Pool-childTask1childTask2:Cat(catName=罗小黑2),ThreadName:Pool-childTask2childTask1:Cat(catName=罗小黑3),ThreadName:Pool-childTask1childTask2:Cat(catName=罗小黑3),ThreadName:Pool-childTask2childTask1:Cycle-1-EndchildTask2:Cycle-1-结束输出数据:“Pool-childTask”为线程池名称;“childTask”是任务名称;"Cat(catName=罗小黑)"为执行结果;“childTaskshutdown”是关闭标记;“childTask:Cycle-X-Begin”和“childTask:Cycle-X-End”分别是每个循环的开始和结束标记下面来分析一下执行结果:childTask1和childTask2分别执行,在第一个循环中它们都输出了5块罗小黑数据正常;在第二轮执行的时候,我启动了shutdown命令,这次第二轮执行并没有直接停止,而是先执行了task中的数据,然后再执行exit,所以和我们优雅退出的结论是完全一致的.2.5源码地址GitHub地址:https://github.com/lml200701158/java-study/tree/master/src/main/java/com/java/parallel/pool/ofc