当前位置: 首页 > 后端技术 > Java

[Netty]八、基于时间轮的定时器HashedWheelTimer

时间:2023-04-02 10:37:09 Java

一、前言最近在看Redisson的源码时,看到内部使用了netty提供的这个组件。我想看看这个定时器是如何实现的?先介绍一下HashedWheelTimer。它是一个基于时间轮的计时器。它的优点是实现起来比较简单。缺点是不能准确、准时地执行定时任务,只能近似执行。因为时间轮中每个刻度的大小可能是100ms,也可能是1ms,执行任务的时候时间上会有一点误差。在大多数网络应用中,IO任务的执行时间往往不需要那么精确,所以默认的scalesize是100ms,但是你可以自己调整scalesize,最小为1ms。简单介绍完HahsedWheelTimer,我们再来看看时间轮的结构。2.时间轮的结构。时间轮类似于时钟。它有像时钟一样的刻度。每个刻度可以是100毫秒或1毫秒。上图中时间轮有6个刻度,每个刻度为100ms,即每100ms顺时针移动一个刻度,转一圈需要600ms(下面要介绍的HashedWheelTimer默认刻度数为512.刻度大小默认为100ms)工作原理:每次向时间轮提交延时任务,都会判断该任务的执行时间应该放在哪个刻度上。比如在时间轮启动后的第100毫秒,延迟400ms提交,如果执行任务,则任务放置在刻度5上,如果提交延迟700ms的任务,则放置任务在刻度2上,会记录到任务需要绕一圈时间轮才能执行。时间轮每移动一个刻度,就会执行当前刻度上的任务,一个刻度上可能有多个任务。因为HashedWheelTimer是基于时间轮的定时器,我们来看看HashedWheelTimer是如何实现的?3、HashedWheelTimer的相关组件这里我们可以先看一下HashedWheelTimer的UML图,对相关组件有一个整体的了解,如下:Timer:定时器接口,提供提交延迟任务newTimeout、停止定时器等方法。HashedWheelTimer:实现Timer接口,包含工作线程Worker、时间轮轮、延时任务队列超时、线程池taskExecutor等。里面是一个HashedWheelTimeout双向链表,如下图TimerTask:延时任务接口,内部只提供了一个run方法来执行Timeout:Timer和TimerTask的封装HashedWheelTimeout:包含了任务的执行时间deline,所需的轮数remainingRounds,以及双向链表上一个和下一个HashedWheelTimeout,HashedWheelBucket等4.HashedWheelTimer的大致工作流程如下:从上图可以看出,主要分为4个步骤,但准确来说应该有5个步骤:提交延迟任务对于HashedWheelTimer来说,延迟任务会先放入任务队列timeouts中。工作线程Worker会从任务队列timeouts中获取任务,并将获取到的HashedWheelTimeout任务放入指定的HashedWheelBucket中。取出当前scale对应的HashedWheelBucket的所有HashedWheelTimeouts执行scale。勾选加1,然后回到第二步,以此类推。5.源码解读5.1HashedWheelTimer的关键属性关键属性如下:Workerworker:工作线程WorkerintworkerState:工作线程状态longtickDuration:刻度大小,默认为100msHashedWheelBucket[]wheel:时间轮子的每一个tick对应一个HashedWheelBucketQueuetimeouts:任务队列QueuecanceledTimeouts:取消任务队列AtomicLongpendingTimeouts:正在处理的任务数ExecutortaskExecutor:执行任务的线程池longstartTime:定时器5.2提交延迟任务通过newTimeout方法向HahedWheelTimer提交延迟任务。newTimeout方法的步骤如下:启动工作线程Worker,如果是第一次启动,则设置启动时间startTime,如果已经启动,则跳过延迟任务的计算deadline(当前时间+延迟时间-开始时间startTime)用于确定将哪个HashedWheelBucket放入时间轮。延迟的任务将被封装为HashedWheelTimeout并添加到任务队列超时。结合源码:publicTimeoutnewTimeout(TimerTasktask,longdelay,TimeUnitunit){//部分代码省略//启动工作线程Workerstart();//计算截止时间longdeadline=System.nanoTime()+unit.toNanos(delay)-startTime;if(delay>0&&deadline<0){deadline=Long.MAX_VALUE;}//封装为HashedWheelTimeout加入到任务队列超时HashedWheelTimeouttimeout=newHashedWheelTimeout(this,task,deadline);超时。添加(超时);returntimeout;}5.3工作线程Worker运行具体步骤Worker类有一个关键属性tick,代表当前刻度相对于定时器开始时间,tick只会向上增加,初始值为0。具体步骤如下:等待下一个刻度到来,即当前时间>当前刻度刻度结束时间a)计算当前刻度刻度结束时间,例如Worker刚刚启动,当前刻度刻度为0,则刻度刻度结束时间=tickDuration*(tick+1),即100msb)判断当前时间(相对于当前刻度的结束时间,如果大于当前刻度的结束时间,说明当前时间已经过了当前刻度的结束时间,则准备处理当前scale的所有任务,如果小于,说明当前时间还没有到当前scale的结束时间,主动休眠一段时间后继续判断,直到当前time大于当前scale的结束时间,从任务队列timeouts中获取任务,将延迟任务的deadline除以tickDuration,计算总tick数和任务需要的圈数,计算把总刻度数&(wheel.length-1)放入HashedWheelBucket中(比如计算任务A的总刻度数=1026,当前刻度=25,时间轮的刻度为512,然后计算需要的圈数为1【如果当前scale=1,则需要的圈数为2】,放入下标为HashedWheelBucket2)得到当前对应的HashedWheelBucketscale,从头开始逐一遍历任务列表,如果延迟任务所需轮数为0,则开始执行任务,否则所需轮数减1。scaletick加1,回到第一步,这样看源码publicvoidrun(){//初始化定时器的开始时间startTimestartTime=System.nanoTime();startTimeInitialized.countDown();do{//1.等待下一个tick到来,即当前时间>当前tick结束时间finallongdeadline=waitForNextTick();if(deadline>0){//获取当前tick对应的HashedWheelBucketintidx=(int)(tick&mask);processCancelledTasks();HashedWheelBucketbucket=wheel[idx];//2.从任务队列timeouts中获取任务,将任务放入对应的HashedWheelBuckettransferTimeoutsToBuckets();//3.执行当前scaletick对应的HashedWheelBucketbucket中的所有任务.expireTimeouts(deadline);//4.将1tick++添加到当前刻度刻度;}}while(WORKER_STATE_UPDATER.get(HashedWheelTimer.this)==WORKER_STATE_STARTED);//这里省略部分代码}先来看第一步waitForNextTick方法的具体实现privatelongwaitForNextTick(){longdeadline=tickDuration*(tick+1);为了(;;){//当前时间相对于startTimefinallongcurrentTime=System.nanoTime()-startTime;longsleepTimeMs=(deadline-currentTime+999999)/1000000;//如果当前tick的结束时间<当前时间,则表示当前时间已经过期当前刻度结束时间后,直接返回当前时间if(sleepTimeMs<=0){if(currentTime==Long.MIN_VALUE){返回-Long.MAX_VALUE;}else{返回当前时间;}}//否则主动休眠一段时间,以上条件成立try{Thread.sleep(sleepTimeMs);}catch(InterruptedExceptionignored){if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this)==WORKER_STATE_SHUTDOWN){returnLong.MIN_VALUE;}}}}再看下两步transferTimeoutsToBuckets方法如下:privatevoidtransferTimeoutsToBuckets(){//这里一次最多可以从队列中获取100000个任务for(inti=0;i<100000;i++){HashedWheelTimeout超时=timeouts.poll();如果(超时==空){//表示队列中没有任务,直接返回break;}if(timeout.state()==HashedWheelTimeout.ST_CANCELLED){继续;}//计算scale总数=延迟任务的deadline/scalesizelongcalculated=timeout.deadline/tickDuration;//还需要计算的圈数=总滴答数-当前滴答/时间轮滴答数timeout.remainingRounds=(calculated-tick)/wheel.length;最后的长滴答声=Math.max(calculated,tick);//计算放哪个下标intstopIndex=(int)(ticks&mask);HashedWheelBucketbucket=wheel[stopIndex];//将任务放入对应的HashedWheelBucketbucket.addTimeout(timeout);}}最后看第三步bucket.expireTimeouts,源码如下:publicvoidexpireTimeouts(longdeadline){HashedWheelTimeouttimeout=head;//处理HashedWheelBucket的所有任务while(timeout!=null){HashedWheelTimeoutnext=timeout.next;if(timeout.remainingRounds<=0){//从双向链表中移除任务next=remove(timeout);if(timeout.deadline<=deadline){//执行任务timeout.expire();}else{//超时被放置在错误的槽中。这永远不应该发生。thrownewIllegalStateException(String.format("timeout.deadline(%d)>deadline(%d)",timeout.deadline,deadline));}}elseif(timeout.isCancelled()){next=remove(timeout);}else{//如果需要的圈数>0,则将其减少1timeout.remainingRounds--;}超时=下一步;}}至此,工作线程Worker运行的具体步骤和部分源码的解读就完成了6.总结HashedWheelTimer只是定时器的简单实现,和java中常见的定时器一样,有Timer、ScheduledThreadPoolExecutor等.从上面对其实现原理的分析可以看出,它不能适用于需要精确执行的场景,但是在网络应用中,IO任务的执行时间往往不需要精确。所以可以用在任务比较多,但任务不需要精确执行的场景