当前位置: 首页 > 后端技术 > Java

SpringBoot定时任务 - 经典定时任务结构设计:时间轮(Timing Wheel)案例和实现原理

时间:2023-04-01 17:28:01 Java

SpringBoot定时任务——经典定时任务结构设计:时间轮(TimeWheel)案例及实现原理Wheel),Netty内部实现了一个基于时间轮的HashedWheelTimer,优化百万级I/O超时检测。是一种高性能、低消耗的数据结构,适用于非准实时、延时短时快的任务,比如心跳检测。本文主要介绍TimingWheel及其使用。@pdaiSpringBoot定时任务-NettyHashedWheelTimer方法知识准备什么是定时轮(TimingWheel)Netty的HashedWheelTimer解决什么问题?HashedWheelTimer使用实现案例pom靠2个简单例子进一步理解HashedWheelTimer是如何实现的?什么是多级时轮?实例源码知识准备需要初步了解TimingWheel,Netty的HashedWheelTimer要解决什么问题。什么是TimingWheelTimingWheel是由GeorgeVarghese和TonyLauck在1996年的论文'HashedandHierarchicalTimingWheels:datastructurestoefficientlyimplementatimerfacility'中实现的,在Linux内核中被广泛使用,它是实现之一Linux内核定时器的方法和基础。计时轮(TimingWheel)是一个循环的数据结构,就像一个时钟可以分成很多格子(Tick),每个格子代表时间间隔,它指向一个存储具体任务的链表(timerTask)。以论文中的上图为例,这里一个轮子包含8个格子(Tick),每个tick为一秒;任务添加:如果一个任务要在17秒后执行,那么需要转2轮,最后在Tick=1的位置添加到链表中。任务的执行:当时钟转2圈到Tick=1的位置时,开始执行该位置指向的链表中的任务。(这里的#表示在执行这个任务之前还需要完成剩余的轮数)Netty的HashedWheelTimer解决了什么问题?HashedWheelTimer是Netty基于TimingWheel开发的一个工具类。它解决了什么问题?这里主要有两点:任务延迟+时效性低。@pdai在Netty中一个典型的应用场景就是判断一个连接是否空闲。如果空闲(比如由于网络原因导致客户端到服务器的心跳无法传递),服务器会主动断开连接并释放资源。判断一个连接是否空闲是通过定时任务来完成的,但是Netty可能会维护上百万个长连接,为每个连接都定义一个定时任务是不可行的,那么如何提高I/O超时调度的效率呢?Netty基于TimingWheel开发了HashedWheelTimer工具类,用于优化I/O超时调度(本质上是延时任务);使用TimingWheel结构的另一个重要原因是I/OTimeouts这类任务不需要非常精确的计时。如何使用HashedWheelTimer在了解了计时轮(TimingWheel)和Netty的HashedWheelTimer要解决的问题之后,我们再来看看如何使用HashedWheelTimer,通过构造函数看主要参数publicHashedWheelTimer(ThreadFactorythreadFactory,longtickDuration,TimeUnitunit,intticksPerWheel,booleanleakDetection,longmaxPendingTimeouts,ExecutortaskExecutor){}具体参数说明如下:threadFactory:线程工厂,用于创建工作线程,默认为Executors.defaultThreadFactory()tickDuration:tick周期,即如何oftentotickunit:滴答周期单位ticksPerWheel:时间轮的长度,一圈有多少个格子leakDetection:是否开启内存泄漏检测,默认为truemaxPendingTimeouts:最大执行任务数,默认为-1,即无限制。只有在高并发的情况下才会设置这个参数。实现案例下面是一个HashedWheelTimer的基本用例。@pdaiPom依赖导入pom依赖io.nettynetty-all4.1.77.Final两个简单的例子Example1:执行TimerTask@SneakyThrowspublicstaticvoidsimpleHashedWheelTimer(){log.info("inittask1...");HashedWheelTimer计时器=newHashedWheelTimer(1,TimeUnit.SECONDS,8);//添加新的超时时间timer.newTimeout(timeout->{log.info("runningtask1...");},5,TimeUnit.SECONDS);}执行结果如下:23:32:21.364[主要]INFOtech.pdai。springboot.schedule.timer.netty.HashedWheelTimerTester-初始化任务1......23:32:27.454[pool-1-thread-1]INFOtech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester-运行任务1...例2:任务过期后取消,3秒后再次执行。@SneakyThrowspublicstaticvoidreScheduleHashedWheelTimer(){log.info("inittask2...");HashedWheelTimer计时器=newHashedWheelTimer(1,TimeUnit.SECONDS,8);线程.睡眠(5000);//添加一个新的超时Timeouttm=timer.newTimeout(timeout->{log.info("runningtask2...");},5,TimeUnit.SECONDS);//取消if(!tm.isExpired()){log.info("取消任务2...");tm.取消();}//重新安排timer.newTimeout(tm.task(),3,TimeUnit.SECONDS);}23:28:36.408[main]INFOtech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester-初始化任务2...23:28:41.412[main]INFOtech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester-取消任务2...23:28:45.414[pool-2-thread-1]INFOtech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester-runningtask2...一步一步理解我们通过如下问题一步一步理解HashedWheelTimer。@pdaiHashedWheelTimer是如何实现的?简单看一下HashedWheelTimer是如何实现的。Worker:Worker工作线程主要负责任务的调度和触发,在单线程中运行。HashedWheelBucket:时间轮上面的格子,里面存放着HashedWheelTimeout组成的链表结构的头尾节点。由多个格子组成的时间轮,形成了一道道任务环。HashedWheelTimeout:提交到时间轮的任务会被封装到HashedWheelTimeout构造函数中检查,NotNull(unit)"tickDuration");checkPositive(ticksPerWheel,“ticksPerWheel”);this.taskExecutor=checkNotNull(taskExecutor,"taskExecutor");//将ticksPerWheel标准化为2的幂并初始化轮子。wheel=createWheel(ticksPerWheel);掩码=轮子.length-1;//将tickDuration转换为纳米。长持续时间=unit.toNanos(tickDuration);//防止溢出。if(duration>=Long.MAX_VALUE/wheel.length){thrownewIllegalArgumentException(String.format("tickDuration:%d(expected:0INSTANCE_COUNT_LIMIT&&WARNED_TOO_MANY_INSTANCES.compareAndSet(false,true)){reportTooManyInstances();}}创建wheelprivatestaticHashedWheelBucket[]createWheel(intticksPerWheel){//ticksPerWheel不能大于than2^30checkInRange(ticksPerWheel,1,1073741824,"ticksPerWheel");ticksPerWheel=normalizeTicksPerWheel(ticksPerWheel);HashedWheelBucket[]wheel=newHashedWheelBucket[ticksPerWheel];for(inti=0;i0&&pendingTimeoutsCount>maxPendingTimeouts){pendingTimeouts.decrementAndGet();抛出新的RejectedExecutionException("挂起超时数("+pendingTimeoutsCount+")大于或等于最大允许挂起"+"超时数("+maxPendingTimeouts+")");}开始();//将超时添加到将在下一个滴答时处理的超时队列。//在处理过程中,所有排队的HashedWheelTimeouts将被添加到正确的HashedWheelBucket。长期限=System.nanoTime()+unit.toNanos(delay)-startTime;//防止溢出。if(delay>0&&deadline<0){deadline=Long.MAX_VALUE;}HashedWheelTimeouttimeout=newHashedWheelTimeout(this,task,deadline);超时。添加(超时);returntimeout;}执行方法/***显式启动后台线程。即使您没有调用此方法,后台线程也会按需自动启动。**@throwsIllegalStateException如果这个timer已经*{@linkplain#stop()停止}已经*/publicvoidstart(){switch(WORKER_STATE_UPDATER.get(this)){caseWORKER_STATE_INIT:if(WORKER_STATE_UPDATER.compareAndSet(this,WORKER_STATE_INIT,WORKER_STATE_STARTED)){workerThread。开始();}休息;案例WORKER_STATE_STARTED:休息;caseWORKER_STATE_SHUTDOWN:thrownewIllegalStateException("一旦停止就无法启动");默认值:抛出新错误(“无效的WorkerState”);}//等到startTime被worker初始化。while(startTime==0){try{startTimeInitialized.await();}catch(InterruptedExceptionignore){//忽略-它很快就会准备好。}}}停止方法@OverridepublicSetstop(){if(Thread.currentThread()==workerThread){thrownewIllegalStateException(HashedWheelTimer.class.getSimpleName()+".stop()不能从"+TimerTask.class.getSimpleName()中调用);}if(!WORKER_STATE_UPDATER.compareAndSet(this,WORKER_STATE_STARTED,WORKER_STATE_SHUTDOWN)){//workerState此时可以为0或2-让它始终为2。.decrementAndGet();if(leak!=null){booleanclosed=leak.close(this);断言关闭;}}返回Collections.emptySet();}try{booleaninterrupted=false;while(workerThread.isAlive()){workerThread.interrupt();尝试{workerThread.join(100);}catch(忽略中断异常){中断=真;}}如果(中断){Thread.currentThread().interrupt();}}最后{INSTANCE_COUNTER.decrementAndGet();if(leak!=null){booleanclosed=leak.close(this);断言关闭;}}returnworker.unprocessedTimeouts();}什么是多级计时轮?多级时间轮更容易理解。时钟有时、分、秒,秒转一格(Round),分转一格(Tick),每分钟转一圈(Round),每小时转一圈(Tick)PS:很明显HashedWheelTimer是一层时间轮。示例源码https://github.com/realpdai/t...更多内容告别碎片化学习,没有套路一站式系统化学习后端开发:Java全栈知识体系https://pdai.tech