当前位置: 首页 > 后端技术 > Java

周期任务线程池——ScheduledThreadPoolExecutor&DelayedWorkQueue

时间:2023-04-01 17:54:43 Java

ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩展类,用于实现延时执行任务或周期执行任务。一般来说,周期性任务或定时任务包括两个组成部分:一个是执行任务的线程池,一个是存储任务的内存。还记得石英吗?企业级定时任务框架最重要的内容其实就是这两部分:SimpleThreadPool和JobStore。ScheduledThreadPoolExecutor也不例外,由一个线程池和一个任务队列组成。线程池继承自ThreadPoolExecutor,还有任务队列DelayedWorkQueue,了解了ThreadPoolExecutor和DelayedWorkQueue,基本了解了ScheduledThreadPoolExecutor。另外,ScheduledThreadPoolExecutor的特殊之处在于他执行的任务必须是ScheduledFutureTask,ScheduledFutureTask是“未来要执行的任务”,而这个“未来”是由delay指定的。即使通过ScheduledThreadPoolExecutor提交“立即”而非“未来”执行的任务,也必须通过延迟时间为0的ScheduledFutureTask提交。ScheduledFutureTask任务提交后,加入阻塞队列DelayedWorkQueue等待调度.ScheduledThreadPoolExecutor的创建提供了四种构造方法,都是通过调用父类ThreadPoolExecutor的构造方法完成ScheduledThreadPoolExecutor对象的创建:());}publicSc??heduledThreadPoolExecutor(intcorePoolSize,ThreadFactorythreadFactory){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue(),threadFactory);}publicSc??heduledThreadPoolExecutor(intcorePoolSize,RejectedExecutionHandlerhandler){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue(),handler);}publicSc??heduledThreadPoolExecutor(intcorePoolSize,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue(),threadFactory,handler);}corePoolSize由构造方法的参数指定,而maximumPoolSize在构造方法中固定设置为Integer.MAX_VALUE,即无限的keepAliveTime设置为0,后面会知道ScheduledThreadPoolExecutor的线程数会不超过corePoolSize,如果allowCoreThreadTimeOut保持默认(false),那么这个keepAliveTime其实是没有意义的。四种构造方法都将阻塞队列设置为newDelayedWorkQueue(),即只支持DelayedWorkQueue。ScheduledThreadPoolExecutor的线程池管理ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩展。线程池管理部分没有扩展,保留了ThreadPoolExecutor原有的功能。每次向队列中添加任务时,都会调用ensurePrestart(ThreadPoolExecutor实现)方法创建并启动一个线程。ThreadPoolExecutor的ensurePrestart方法:voidensurePrestart(){intwc=workerCountOf(ctl.get());如果(wcsft=newScheduledFutureTask(command,null,triggerTime(initialDelay,unit),unit.toNanos(period));RunnableScheduledFuturet=decorateTask(command,sft);sft.outerTask=t;延迟执行(t);返回吨;周期性地(由参数period指定),重复执行的任务主要执行以下操作:首先创建ScheduledFutureTask任务,然后将其打包成一个RunnableScheduledFuture。实际上,decorateTask方法直接返回创建的ScheduledFutureTask。没什么好分析的,然后调用delayedExecute启动线程执行。任务ScheduledFutureTask任务也是ScheduledThreadPoolExecutor的重头戏!ScheduledFutureTask继承自FutureTask,实现了RunnableScheduledFuture接口,所以比较复杂:可以异步执行有返回值的任务(Future接口),也可以执行周期性任务(RunnableScheduledFuture接口)。首先,认识几个重要的属性:time:任务应该执行的时间(纳秒)period:周期性执行任务的间隔时间(纳秒),正数代表fix-rate,负数代表fix-delay(fixed-rate和fixed-delay在之前的jdktimer文章中有提到,意思相同)outerTask:指向自己的RunnableScheduledFuture对象compareTo方法:进入DelayedWorkQueue队列时,需要调用CompareTo方法比较大小,这样最近执行的任务就被放入在堆的头部。compareTo方法最终比较时间属性。运行方法:publicvoidrun(){booleanperiodic=isPeriodic();如果(!canRunInCurrentRunState(定期))取消(假);elseif(!periodic)ScheduledFutureTask.super.run();elseif(ScheduledFutureTask.super.runAndReset()){setNextRunTime();reExecutePeriodic(outerTask);如果是一次性任务,直接调用FutureTask的run方法执行任务,否则,如果是周期任务,先调用FutureTask的runAndReset方法,如果调用成功,设置下次执行时间,然后通过调用reExecutePeriodic再次将当前任务添加到队列中。runAndReset方法首先执行当前任务,执行完成后将当前任务的状态重置为NEW,为下一次执行做准备。通过分析runAndReset方法可以知道,周期性任务执行完后,就无法再获取到return了(回想一下FutureTask的代码逻辑,state设置为NEW后,就无法再获取到return了)。delayedExecute启动线程方法的代码很简单:privatevoiddelayedExecute(RunnableScheduledFuturetask){if(isShutdown())reject(task);else{super.getQueue().add(任务);如果(isShutdown()&&!canRunInCurrentRunState(task.isPeriodic())&&remove(task))task.cancel(false);否则ensurePrestart();}}任务加入队列,然后调用ensurePrestart方法启动线程!任务直接进入队列,然后启动线程,任务的执行完全交给ThreadPoolExecutor的任务执行线程Worker去调度:Worker线程调用getTask方法从队列中获取并执行任务!我们现在可以做一个大胆的猜测:周期性任务对执行时间有严格的要求,没有达到执行时间的任务是不能执行的。既然ThreadPoolExecutor的任务执行线程的逻辑是没有执行时间判断的,那么,这个逻辑应该是在getTask方法从队列中获取任务时,通过队列的dequeue方法来实现的。现在轮到阻塞队列DelayedWorkQueue上场了。我们带着这个问题来研究DelayedWorkQueue队列。阻塞队列DelayedWorkQueueDelayedWorkQueue底层是一个数组实现的堆结构。堆结构是一棵完全二叉树,它保证每个结点都大于(或小于)它的叶子结点,这样堆头结点(即数组的第一个元素)一定是最大(或最小)的.DelayedWorkQueue是一个存放ScheduledFutureTask的队列。最近执行的任务需要存放在堆头,每个节点都应该小于它的叶子节点。每有一个任务加入队列,一个节点出队,一个节点删除,都需要根据任务执行时间重新调整队列。初始容量为16,当有新节点加入队列时,如果队列容量不够用,则扩充原容量的50%(新容量=1.5*旧容量)。当有新节点进入队列时,调用siftUp方法重新调整队列,使新节点加入到堆的合适位置。privatevoidsiftUp(intk,RunnableScheduledFuturekey){while(k>0){intparent=(k-1)>>>1;RunnableScheduledFuturee=queue[parent];如果(key.compareTo(e)>=0)中断;队列[k]=e;设置索引(e,k);k=父母;}队列[k]=键;设置索引(键,k);特点:完全二叉树的第k个节点的父节点在数组中的位置为:(k-1)/2四舍五入。当有节点加入队列时,默认加入尾部k(当前数组的大小),获取k的父节点,比较当前节点和父节点。如果当前节点大于父节点(调用了ScheduledFutureTask的compareTo方法,比较时间),说明当前节点找到了正确的位置,否则当前节点与父节点交换位置,继续找到父节点进行比较,直到找到根节点。这样,新加入的节点在通过siftUp操作后,会根据任务触发时间time进入队列中合适的位置。当一个节点需要从队列中移除时,它也需要执行类似的操作(调用siftUp或siftDown方法)来保证堆的正确顺序。这样DelayedWorkQueue队列就可以一直保证堆头(也就是数组的第一个元素)是最近需要执行的任务。任务执行线程在获取到最近要执行的任务时,不需要遍历整个队列,只需要获取到堆头的第一个节点即可执行。堆结构很适合周期任务或者定时任务的应用场景:节点加入任务的时效性要求不高(因为是需要延迟的任务,时效性要求肯定不高),而获取数据的时效性要求高(到了执行任务的时间,最好立即获取并立即执行),可以有效提高任务执行的效率。在了解了堆结构的特点以及进入和退出堆结构的排序逻辑之后,我们需要验证一下我们的猜测:出队逻辑会判断节点的执行时间是否到了。因为ThreadPoolExecutor的getTask()方法是调用队列的take方法获取任务的,直接看DelayedWorkQueue的take()方法即可。publicRunnableScheduledFuturetake()throwsInterruptedException{finalReentrantLocklock=this.lock;锁。锁定中断();try{for(;;){RunnableScheduledFuturefirst=queue[0];如果(first==null)available.await();else{长延迟=first.getDelay(NANOSECONDS);if(delay<=0)returnfinishPoll(first);第一个=空;//等待时不要保留refif(leader!=null)available.await();else{ThreadthisThread=Thread.currentThread();领导者=这个线程;尝试{available.awaitNanos(delay);}finally{if(leader==thisThread)leader=null;}}}}}finally{if(leader==null&&queue[0]!=null)available.signal();锁定。解锁();}}逻辑很清晰:加锁队列获取堆头任务,如果为空(空队列,无任务),等待(注意等待会释放锁资源,等待唤醒然后重新获取Lock资源,ReentrantLock我们后面会详细分析)否则判断堆头任务执行时间到了,将堆头任务出队,返回堆头任务;否则,堆头任务的执行时间还没有到,采用Leader-Follower模式等待上面的第3点来验证我们的猜测!Leader-Follower模式是指当线程池中有多个线程等待执行任务时,线程之间会竞争Leader。只有一个线程会赢得竞争成为Leader,其他线程成为Followers。Leader被授权只等待指定时间(当前与下一次任务执行的时间差),Follower线程需要无限期等待(被其他线程取消或唤醒)。在等待过程中,如果有新的节点加入队列成为堆头(新加入的任务成为最近要执行的任务),需要设置leader为空,唤醒等待线程重新争夺领导者。Leader-Follower模式可以有效防止所有等待线程无限期进入等待模式,被动等待其他线程唤醒,等到等待时间后主动唤醒执行任务。总结周期任务线程池ScheduledThreadPoolExecutor就完成了。简单总结:周期性线程池在创建时可以处理即时任务、延迟一次性任务和延迟周期性任务(FixedRate和FixedDelay模式)。核心线程数。线程池最终启动的线程数不会超过核心线程数。每提交一个任务,同时启动一个线程,直到线程池数量达到核心线程数量。无论是即时任务还是延迟任务,任务提交后,直接加入阻塞队列,等待线程从队列中取出并执行。阻塞队列采用DelayedWorkQueue这种堆结构,最近执行的任务总是放在堆头的线程池中。线程获得了堆头任务的优先执行权谢谢!上一篇线程池——ThreadPoolExecutor源码分析

猜你喜欢