前言我们经常会遇到延时任务和定时任务。在网络连接场景中,经常会有一些超时控制。随着连接数的增加,这些超时任务的数量往往是巨大的。对大量任务实施超时管理并不是一件容易的事。几种定时任务的实现java.util.TimerJDK在1.3中引入了Timer数据结构来实现定时任务。Timer的实现思路比较简单,里面主要有两个属性:TaskQueue:定时任务的抽象类TimeTask的列表。TimerThread:用于执行定时任务的线程。privatefinalTaskQueuequeue=newTaskQueue();privatefinalTimerThreadthread=newTimerThread(queue);Timer结构体还定义了一个抽象类TimerTask,继承了Runnable接口。业务系统实现这个抽象类的run方法,提供具体的延时任务逻辑。TaskQueue内部采用大顶堆的方式,按照任务的触发时间排序。TimerThread无限循环从TaskQueue中获取队头,等待队头任务超时触发任务,将任务从队列中移除。Timer的数据结构和算法很容易理解。所有超时任务都先进入延迟队列。后台超时线程不断从延迟队列中取任务,超时后执行任务。延迟队列按大顶堆排序。延迟任务场景下有3个操作,分别是:添加任务、提取队头任务、查看队头任务。查看队列头部的事件复杂度为O(1)。添加任务和提取队头任务的时间复杂度为O(Logn)。当任务数量很多的时候,增删改的开销也比较大。另外,由于Timer内部只有一个处理线程,如果延迟任务的处理耗时较多,则后续任务的处理也会相应延迟。代码如下:publicstaticvoidmain(String[]args){Timertimer=newTimer();//延时1秒执行任务timer.schedule(newjava.util.TimerTask(){@Overridepublicvoidrun(){System.out.println("延时1秒执行任务"+System.currentTimeMillis());}},1000);timer.schedule(newjava.util.TimerTask(){@Overridepublicvoidrun(){System.out.println("任务执行延迟2秒"+System.currentTimeMillis());}},2000);尝试{Thread.sleep(5000);}catch(InterruptedExceptione){e.printStackTrace();}计时器。取消();}ScheduledThreadPoolExecutor由于Timer只有一个线程来处理延迟任务,当任务较多时显然不够用。JDK1.5引入线程池接口ExecutorService后,也提供了相应的ScheduledExecutorService子类接口,用于处理延迟任务。该接口内部还使用了一个按小顶堆排序的延迟队列来存储任务。线程池中的线程将在此队列上等待,直到可以拾取任务。总的来说,ScheduledExecutorService和Timer的区别在于,前者依赖线程池来执行任务,任务本身会判断是什么类型的任务,需要重复执行的会加入到任务队列中任务执行完成后。对于后者,它只是依靠一个线程不断获取队头任务并尝试执行,在效率和安全性上都不如前者。ScheduledExecutorService的实现有一些特殊的功能。只有一个线程可以提取延迟队列头部的任务,并根据任务的超时时间等待。在此等待期间,其他线程无法获取任务。这种实现是为了防止多个线程同时获取任务,导致任务在超时前触发,或者在等待任务超时时添加新任务无法响应。由于ScheduledExecutorService可以使用多线程,这也缓解了单个任务执行时间过长导致的后续任务阻塞。但是延迟队列也是采用小顶堆排序的方式,所以添加任务和删除任务的时间复杂度都是O(Logn)。在大量任务的情况下,性能比较差。代码如下:publicclassScheduledThreadPoolServiceTest{//参数表示可以同时执行的计划任务数privateScheduledExecutorServiceservice=Executors.newScheduledThreadPool(3);/***schedule:延迟2秒执行任务*/publicvoidtask0(){service.schedule(()->{System.out.println("task0-start");sleep(2);System.out.println("task0-end");},2,TimeUnit.SECONDS);}/***scheduleAtFixedRate:2秒后,每4秒执行一次任务*注意,如果任务执行时间(比如6秒)大于间隔时间,就会等待任务结束执行并直接开始下一个任务*/publicvoidtask1(){service.scheduleAtFixedRate(()->{System.out.println("task1-start");sleep(2);System.out.println("task1-结束");},2,4,TimeUnit.SECONDS);}/***scheduleWithFixedDelay:2秒后,每次延迟4秒执行一个任务*注意,这里是等待上一个任务执行结束,固定时间延迟后再开始下一个任务*/publicvoidtask2(){service.scheduleWithFixedDelay(()->{System.out.println("task2-start");睡觉(2);System.out.println("任务2-结束");},2,4,TimeUnit.SECONDS);}privatevoidsleep(longtime){try{TimeUnit.SECONDS.sleep(time);}catch(InterruptedExceptione){e.printStackTrace();}}publicstaticvoidmain(String[]args){ScheduledThreadPoolServiceTesttest=newScheduledThreadPoolServiceTest();System.out.println("主要开始");测试任务0();//测试任务1();//测试任务2();测试.睡眠(10);System.out.println("主端");}}DelayQueueJava中还有一个延迟队列DelayQueue,加入延迟队列的所有元素都必须实现Delayed接口。内部延迟队列是使用PriorityQueue实现的,所以还是使用了优先队列!Delayed接口继承了Comparable所以优先级队列是按延迟排序的。Redis排序setRedis的数据结构Zset也可以达到延迟队列的效果,主要是利用了它的score属性,redis通过score对set中的成员从小到大进行排序。在内部,zset是使用跳表实现的。跳表数据结构示意图:总体来说,跳表删除操作的时间复杂度为O(logN)。有没有更高效的数据结构?Timer、ScheduledThreadPool、DelayQueue,综上所述,它们都是利用优先级队列获取最早需要执行的任务,所以插入和删除任务的时间复杂度为O(logn),Timer和ScheduledThreadPool的周期任务为passed是通过重置任务的下一次执行时间来完成的。但是由于添加新任务和抽取任务的时间复杂度都是O(Logn),当任务数量很大,比如几万、几十万的时候,性能开销就变得巨大了。问题出在时间复杂度上,插入和删除的时间复杂度是O(logn),那么假设频繁插入和删除的次数是m,总的时间复杂度是O(mlogn),那么,有没有比值新任务和提取任务?O(Log2n)复杂度较低的数据结构呢?答案就在那里。在论文《Hashed and Hierarchical Timing Wheels》中,设计了一种叫做TimingWheels的数据结构,在处理延迟任务时,将增删任务的时间复杂度降低到O(1)。时间轮算法的基本原理顾名思义,时间轮的数据结构和我们时钟上的数据指针非常相似。时间轮是用一个圆形数组实现的,数组的每个元素都可以称为一个槽,这一点和HashMap是一样的。slot内部使用双向链表存储要执行的任务。链表操作的增删改查时间复杂度是O(1),槽本身也是指时间精度。最高分辨率为1秒。也就是说,延迟1.2秒的任务和延迟1.5秒的任务会被加到同一个槽中,然后遍历这个槽中的链表执行1秒的任务。任务插入当有延迟任务要插入时间轮时,先计算其延迟时间和单位时间的残值,将残值个数从指针指向的当前槽移到所在槽延迟任务需要放在slot中。例如,时间轮有8个槽,编号为0~7。指针当前指向插槽2。添加一个延时任务,延时时间为4秒,4%8=4,所以这个任务会被插入到slot6的延时任务队列中,有4+2=6。timeslots的实现时间轮的slots可以通过循环数组的方式实现,即指针越过数组边界后返回起始下标。简而言之,时间轮的算法可以描述为:用一个队列来存放延迟的任务,同一个队列中的任务有相同的延迟时间。使用循环数组存储元素,数组中的每个元素都指向一个延迟任务队列。有一个当前指针指向数组中的一个槽位,每单位时间指针移动到下一个槽位。指针指向的slot的延迟队列,里面所有的延迟任务都会被触发。在时间轮上添加一个延时任务,将其延时时间除以单位时间得到的残值,从当前指针开始,移动残值个数对应的槽位,即为放置延时任务的槽位.基于这种数据结构,插入延迟任务的时间复杂度下降到O(1)。而当指针指向某个槽时,则触发与该槽相连的延迟任务队列中的所有延迟任务。延迟任务的触发和执行不应影响指针向后移动的时间精度。因此,一般情况下,用于移动指针的线程只负责任务的触发,任务的执行由其他线程完成。例如,可以将slot上的延迟任务队列放到一个额外的线程池中执行,然后在slot上新建一个空白的延迟任务队列,用于添加后续任务。关于扩展,如果要添加一个50秒后执行的任务怎么办?这个插槽好像不够?您要添加插槽吗?和HashMap一样的扩展?假设要求精度为1秒,要能够支持延迟时间为1天的延迟任务,时间轮的槽数需要为60×60×24=86400个,这就需要更多的内存消耗。显然,简单地增加插槽数量并不是一个好的解决方案。有两种常见的方法:通过增加轮数。50%8+1=3,即应该放在slot3,下标2。那么(50-1)/8=6,即轮数记为6。也就是说,循环6轮后,当扫描到下标为2的slot时,就会触发该任务。Netty中的HashedWheelTimer就使用了这种方法。通过多个级别。这个比较像我们的手表,像我们的秒针转一圈,分针转一格,分针转一圈,时针转一格。多级时间轮就是这样实现的。假设上图是一楼,那么一楼走一圈,二楼走一格。可知二楼一格为8秒。假设二楼也有8个槽位,那么绕着二楼走一圈,三楼一格,可以知道三楼一格是64秒。然后一格三层,每层8个槽位,共24个槽位时间轮可以处理最大延迟512秒的任务。而多级时间轮也会有降级操作。假设一个任务延迟了500秒,那么一开始添加的时候一定要放在第三层。当时间经过436秒时,需要64秒才能完成。触发任务的执行,此时是一个延迟64秒的任务,所以会降下来放在第二层,第一层容纳不下。又过了56秒,相对来说是一个延时8秒后才执行的任务,所以会降级到第一层等待执行。降级是为了保证时间精度的一致性。Kafka内部使用了多级时间轮算法。降级过程:下篇文章会讲到时间轮在Netty和Kafka中的具体实现。
