前言线程池在运行时,会不断从任务队列中获取任务,然后执行任务。如果我们要实现延??迟或定时执行任务,重点是任务队列会根据任务的延迟时间进行排序。延迟时间越短,队列在队列前面,会先执行。队列是先进先出的数据结构,即先进入队列的数据先获取。但是有一种特殊的队列叫做优先级队列,它对插入的数据进行优先排序,保证优先级高的数据先被取出,而不管数据的插入顺序如何。有效实现优先级队列的一种常见方法是使用堆。堆的实现请参考《堆和二叉堆的实现和特性》ScheduledThreadPoolExecutor线程池ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以其内部数据结构与ThreadPoolExecutor基本相同,并在其基础上增加了按时间调度执行任务的功能,即分为延时执行任务和周期执行任务。ScheduledThreadPoolExecutor的构造函数只能传3个参数corePoolSize、ThreadFactory、RejectedExecutionHandler,默认的maximumPoolSize为Integer.MAX_VALUE。工作队列是高度自定义的延迟阻塞队列DelayedWorkQueue。它的实现原理与DelayQueue基本相同。核心数据结构是二叉最小堆的优先级队列。当队列满了会自动扩容,所以offer操作永远不会阻塞,maximumPoolSize也将不再需要,所以线程池中永远最多有corePoolSize个工作线程在运行。publicScheduledThreadPoolExecutor(intcorePoolSize,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue(),threadFactory,handler);}DelayedWorkQueueDelayedWorkQueue也是一个延迟队列,和DelayQueue一样,只是迁移了定时任务将优先级队列和DelayQueue的实现过程放到自己的方法体中,这样就可以在过程中灵活的添加针对定时任务的方法调用。工作原理DelayedWorkQueue的实现原理比较让人满意。它在内部维护一个由RunnableScheduledFuture类型数组实现的最小二进制堆。初始容量为16。ReentrantLock和Condition用于实现生产者和消费者模式。源码分析定义DelayedWorkQueue的类继承关系如下:其包含的方法定义如下:成员属性//Initial,数组的长度。privatestaticfinalintINITIAL_CAPACITY=16;//使用数组存储队列中的元素。privateRunnableScheduledFuture>[]queue=newRunnableScheduledFuture>[INITIAL_CAPACITY];//使用锁保证多线程并发安全问题。privatefinalReentrantLocklock=newReentrantLock();//队列中存储元素的大小privateintsize=0;//具体指队头任务所在的线程privateThreadleader=null;//当延迟时间为队头任务起来,或者有新任务成为队头,用于唤醒等待线程privatefinalConditionavailable=lock.newCondition();DelayedWorkQueue使用数组来存储队列中的元素。核心数据结构是二叉最小堆的优先级队列。当队列满了会自动扩容。构造函数DelayedWorkQueue是ScheduledThreadPoolExecutor的静态类,默认只有一个无参构造函数。staticclassDelayedWorkQueueextendsAbstractQueueimplementsBlockingQueue{//...}enqueue方法DelayedWorkQueue提供了put/add/offer(withtime)三种插入元素的方法。我们发现,与普通的阻塞队列相比,三种添加方法都调用了offer方法。那是因为它不具备队列满的条件,也就是说,它可以不断地往DelayedWorkQueue中添加元素。当元素个数超过数组长度时,数组将被扩充。publicvoidput(Runnablee){offer(e);}publicbooleanadd(Runnablee){returnoffer(e);}publicbooleanoffer(Runnablee,longtimeout,TimeUnitunit){returnoffer(e);}offerAdd元素ScheduledThreadPoolExecutor在提交任务时调用DelayedWorkQueue.add,以及一些外部提供的添加元素的方法,如add、put,都调用offer。publicbooleanoffer(Runnablex){if(x==null)thrownewNullPointerException();RunnableScheduledFuture>e=(RunnableScheduledFuture>)x;//使用锁保证并发操作安全finalReentrantLocklock=this.lock;lock.lock();try{inti=size;//如果要超出数组的长度,则必须扩充数组if(i>=queue.length)//数组扩充grow();//元素个数加一inthequeuesize=i+1;//如果是第一个元素,那么就不用排序了,直接赋值即可if(i==0){queue[0]=e;setIndex(e,0);}else{//调用siftUp方法对插入的元素进行排序。siftUp(i,e);}//表示新插入的元素为队头,如果队头被替换,则需要唤醒等待获取的线程任务。if(queue[0]==e){leader=null;//唤醒等待获取任务可用的线程.signal();}}finally{lock.unlock();}returntrue;}基本流程如下:作为生产者的入口,首先获取锁。判断队列是否满(size>=queue.length),满了就展开grow()。队列未满,大小+1。判断加入的元素是否是第一个,如果是,则不需要堆。如果加入的元素不是第一个,则需要堆siftUp。如果堆顶元素刚好是此时加入的元素,则唤醒take线程进行消费。最后释放锁。offer的基本流程图如下:expansiongrow()可以看到当队列满的时候,不会阻塞等待,而是继续扩容。新容量newCapacity在旧容量oldCapacity的基础上扩大了50%(oldCapacity>>1相当于oldCapacity/2)。最后是Arrays.copyOf,先根据newCapacity创建一个新的空数组,然后将旧数组的数据复制到新数组中。privatevoidgrow(){intoldCapacity=queue.length;//每次扩展增加原数组一半的数量。//grow50%intnewCapacity=oldCapacity+(oldCapacity>>1);if(newCapacity<0)//overflownewCapacity=Integer.MAX_VALUE;//使用Arrays.copyOf复制一个新数组queue=Arrays.copyOf(queue,newCapacity);}siftUp新加入的元素会先加入到堆底,然后和上面的父节点逐级比较。通过循环,找到堆二叉树节点位置应该插入的元素key,与父节点位置交互。入栈siftUp的详细过程可以参考《堆和二叉堆的实现和特性》privatevoidsiftUp(intk,RunnableScheduledFuture>key){//当k==0时,已经到了堆二叉树的根节点,跳出循环的while(k>0){//父节点位置坐标,相当于(k-1)/2intparent=(k-1)>>>1;//获取父节点位置元素RunnableScheduledFuture>e=queue[parent];//如果key元素大于父节点的position元素满足条件,则跳出循环//因为是从小到大排序的。if(key.compareTo(e)>=0)break;//否则,将父节点元素存入位置kqueue[k]=e;//这个只有当元素是ScheduledFutureTask对象实例时才有用,用于快速取消任务。setIndex(e,k);//重新赋值k,找到元素key应该插入堆二叉树的节点k=parent;}//循环结束,k为元素key应该插入的节点位置被插入queue[k]=key;setIndex(key,k);}出队方法DelayedWorkQueue提供了以下出队方法take(),等待获取队头元素poll(),立即获取队头元素poll(longtimeout,TimeUnitunit),超时等待getQueueheadelementtakeconsumptionelementWorker工作线程启动后会循环消费工作队列中的元素,因为ScheduledThreadPoolExecutor的keepAliveTime=0,所以消费任务只调用DelayedWorkQueue.take。take的基本流程如下:首先获取一个可中断锁,判断堆顶元素是否为空,如果为空则阻塞等待available.await()。如果堆顶元素不为空,则获取其延迟执行时间delay。delay<=0表示执行时间到了,finishPoll会出队。如果delay>0还没到执行时间,判断leader线程是否为空。如果不为空,说明还有其他take线程也在等待,当前take会被无限阻塞。leader线程为空,当前take线程置为leader,阻塞等待延迟时间。如果当前leader线程等待延时自动唤醒或者被其他take线程唤醒,则leader最终置为null。再次循环判断delay<=0出队。跳出循环后判断leader为空,堆顶元素不为空,然后唤醒其他take线程,最后判断是否加锁。publicRunnableScheduledFuture>take()throwsInterruptedException{finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{for(;;){RunnableScheduledFuture>first=queue[0];//如果没有任务,让线程在可用的情况下等待。if(first==null)available.await();else{//获取任务剩余延迟时间longdelay=first.getDelay(NANOSECONDS);//如果延迟时间到了,则返回本任务执行。if(delay<=0)returnfinishPoll(first);//设置first为null,当线程等待时,不持有first的引用first=null;//不要保留refwhilewaiting//如果还是原来的等待队头任务的线程,//说明队头任务的延迟时间还没有到,继续等待。if(leader!=null)available.await();else{//记录当前等待队列头部任务的线程ThreadthisThread=Thread.currentThread();leader=thisThread;try{//当延迟任务时间到了,能够超时自动唤醒。available.awaitNanos(delay);}finally{if(leader==thisThread)leader=null;}}}}}finally{if(leader==null&&queue[0]!=null)//唤醒等待线程任务可用。signal();ock.unlock();}}take的基本流程图如下:take线程阻塞等待。可以看出producertake线程会在两种情况下阻塞等待:堆顶元素为空。堆顶元素的延迟>0,finishPoll将delay<=0的堆顶元素出队,当执行时间到了,出队是一个下堆siftDown的过程。//移除队列头元素privateRunnableScheduledFuture>finishPoll(RunnableScheduledFuture>f){//将队列中的元素个数减一ints=--size;//获取队列尾部的元素xRunnableScheduledFuture>x=queue[s];//将原队列尾部的元素设置为nullqueue[s]=null;if(s!=0)//因为队列的头部元素,所以进行重排序已移除。siftDown(0,x);setIndex(f,-1);returnf;}堆的删除方法主要分为三步:首先,将队列中的元素个数减少一个;将原队列的尾元素设置为队列的头元素,然后将队列的尾元素设置为null;调用setDown(O,x)方法以确保元素根据其优先级排序。向下入栈siftDown因为堆顶元素出队,所以堆的结构被破坏。需要整理整理,把堆的尾元素移动到堆顶,然后向下堆:从堆顶开始,父节点和左右子节点更小的子节点在节点比较中(左孩子不一定比右孩子小)。如果父节点小于等于较小的子节点,则循环结束,不需要交换位置。如果父节点大于较小的子节点,则交换位置。继续向下循环,判断父节点和子节点的关系,直到父节点小于等于较小的子节点,循环结束。见《堆和二叉堆的实现和特性》privatevoidsiftDown(intk,RunnableScheduledFuture>key){//无符号右移,相当于size/2inthalf=size>>>1;//通过循环,父A节点的值不能大于一个子节点。while(kc=queue[child];//右子节点相当于(k*2)+2intright=child+1;//如果左子节点的元素值大于右子节点的元素值,则右子节点为值较小的子节点。//需要重新赋值c和childif(right0)c=queue[child=right];//如果父节点元素的值较小比较小的子节点元素值,则跳出循环if(key.compareTo(c)<=0)break;//否则父节点元素将与子节点queue[k]=c进行交换;setIndex(c,k);k=child;}queue[k]=key;setIndex(key,k);}leaderthreadLeader线程的设计是Leader-Follower模式的一种变体,旨在等待不必要的时间。当一个take线程成为leader线程时,只需要等待下一个延迟时间,leader线程以外的其他take线程需要等待leader线程出队后再唤醒其他take线程。poll()立即获取队头元素。当队头任务为null,或者任务延迟时间还没到,说明任务不能返回,所以直接返回null。否则调用finishPoll方法,移除队头元素并返回。publicRunnableScheduledFuture>poll(){finalReentrantLocklock=this.lock;lock.lock();try{RunnableScheduledFuture>first=queue[0];//队头任务为null,否则任务延迟时间未到,全部返回nullif(first==null||first.getDelay(NANOSECONDS)>0)returnnull;else//移除队列头元素returnfinishPoll(first);}finally{lock.unlock();}}poll(longtimeout,TimeUnitunit)等待超时获取队列的头部元素。与take方法相比,必须考虑设置的超时时间。如果超过超时时间,没有获取到有用的任务,则返回null。其他和take方法中的逻辑一样。publicRunnableScheduledFuture>poll(longtimeout,TimeUnitunit)throwsInterruptedException{longnanos=unit.toNanos(timeout);finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{for(;;){RunnableScheduledFuture>first=队列[0];//如果没有任务。if(first==null){//超时已到,则直接返回nullif(nanos<=0)returnnull;else//否则让线程在可用的情况下等待nanos时间nanos=available。awaitNanos(nanos);}else{//获取任务的剩余延迟时间longdelay=first.getDelay(NANOSECONDS);//如果延迟时间到了,则返回本任务执行。if(delay<=0)returnfinishPoll(first);//如果超时,则直接返回nullif(nanos<=0)returnnull;//设置first为null,当线程等待时,不持有firstReferencefirst=null;//don'tretainrefwhilewaiting//如果超时时间小于任务的剩余延迟时间,则可能获取不到任务。//这里让线程等待超时任务延时时间到了,可以自动超时唤醒。longtimeLeft=available.awaitNanos(delay);//计算剩余超时nanos-=delay-timeLeft;}finally{if(leader==thisThread)leader=null;}}}}}finally{if(leader==null&&queue[0]!=null)//唤醒等待任务的线程available.signal();lock.unlock();}}remove删除指定元素删除指定元素一般用于取消任务,任务是还在阻塞队列中,那么需要移除。当删除的元素不是堆的末尾元素时,需要入堆。publicbooleanremove(Objectx){finalReentrantLocklock=this.lock;lock.lock();try{inti=indexOf(x);if(i<0)returnfalse;//维护heapIndexsetIndex(queue[i],-1);ints=--size;RunnableScheduledFuture>replacement=queue[s];queue[s]=null;if(s!=i){//删除不在堆尾的元素,所以堆处理isrequired//先堆下SiftDown(i,replacement);if(queue[i]==replacement)//如果位置i的元素降堆后还在替换,说明不需要降堆,//需要升堆siftUp(i,replacement);}returntrue;}finally{lock.unlock();}}总结使用优先级队列DelayedWorkQueue保证加入队列的任务会按照任务的延迟时间排序,延迟时间为less的任务会被优先获取。DelayedWorkQueue的数据结构是基于堆实现的;DelayedWorkQueue使用数组实现堆,根节点出队,替换为最后一个叶子节点,然后下推,直到满足堆建立条件;最后一个叶子节点入队,然后向上推,满足堆建立条件;DelayedWorkQueue满后会自动扩容到原来容量的1/2,即永不阻塞,最大扩容可达Integer.MAX_VALUE,所以线程池最多运行corePoolSize个工作线程;当堆顶元素为空且delay>0时,阻塞等待;DelayedWorkQueue是一种生产者消费者模式,生产永不阻塞,消费可以阻塞;DelayedWorkQueue有一个leaderthread变量,它是Leader-Follower模式的一种变体。当一个take线程成为leader线程时,只需要等待下一个延迟时间,leader线程以外的其他take线程需要等待leader线程出队后再唤醒其他take线程。