当前位置: 首页 > 后端技术 > Java

延迟任务这么详尽的详解你都看过了

时间:2023-04-01 17:02:31 Java

概述延迟任务相信大家都不陌生,在实际业务中的应用场景可以说比比皆是。例如,下单后15分钟内未付款将取消订单,超过时限外卖将自动赔付。在这些情况下,我们应该如何设计服务的实现?笨办法自然是调度任务去轮询数据库。但是当业务量很大,事件处理比较耗时的时候,我们的系统和数据库往往会面临巨大的压力。如果采用这种方式,可能会导致数据库和系统崩溃。那么有什么好的方法吗?今天给大家介绍几种延迟任务的实现方式。JAVADelayQueue你没看错,java内部有一个内置的延迟队列,它位于java并发包中。DelayQueue是jdk自带的延迟队列实现。它的实现依赖于可重入锁ReentrantLock、条件锁Condition和优先队列PriorityQueue。而且本质上他也是一个阻塞队列。那么他是如何实现延时效果的呢?DelayQueue的实现原理首先,DelayQueue队列中的元素必须继承一个接口Delayed。我们发现这个类publicinterfaceDelayedextendsComparable{longgetDelay(TimeUnitunit);}发现这个类内部定义了一个返回值为long的方法getDelay,这个方法用来定义队列中元素的过期时间,所有需要放入队列的元素都必须实现这个方法。那么我们来看看延迟队列的队列是如何运行的。让我们以最典型的报价为例:publicbooleanoffer(Ee){finalReentrantLocklock=this.lock;锁.锁();尝试{q.offer(e);如果(q.peek()==e){leader=null;可用的信号();}返回真;}最后{lock.unlock();甚至直接调用优先级队列的offer按照延迟对队列进行排序,只是加了个锁,做了一些数据调整,没有深入,但是take的实现好像很复杂。(注意Dalayed继承了Comparable方法,所以可以直接用优先级队列排序,只要自己实现compareTo方法即可)我试着加了一些注释,方便大家看:publicEtake()抛出InterruptedException{finalReentrantLocklock=this.lock;锁.lockInterruptibly();try{//可选操作for(;;){//获取队列的第一个元素,如果队列为空//阻塞直到有新元素加入队列,offer等方法调用signal唤醒threadEfirst=q.peek();如果(first==null)available.await();else{//如果队列中有元素longdelay=first.getDelay(NANOSECONDS);//判断延时时间,如果时间到了,直接取出数据返回if(delay<=0)returnq.poll();第一个=空;//如果leader为空,如果(leader!=null)可用则阻塞。等待();else{//获取当前线程ThreadthisThread=Thread.当前线程();//设置leader为当前线程leader=thisThread;try{//阻塞延迟时间available.awaitNanos(delay);}finally{if(leader==thisThread)leader=null;}}}}}finally{if(leader==null&&q.peek()!=null)available.signal();锁定。解锁();我们可以看到,take的实现依赖于无限自旋,直到第一个队列元素超时才会返回,否则只会阻塞DelayQueue实现延迟队列的优缺点看了源码后,大家应该对DelayQueue的实现有一个大概的了解,以及对它的优缺点有一定的了解。它的优点很明显:java原生支持,无需引入第三方工具,线程安全,即插即用,简单易用,但缺点也很明显:不支持分发,数据放在内存中不用persistence支持,当服务宕机时,数据会丢失。插入时,使用优先级队列的排序。时间复杂度高,无法很好地管理队列中的任务。那么延迟队列有没有更好的实现呢?我们继续往下看~TimeWheelAlgorithmTimeWheelAlgorithm是一种专门用来处理延迟任务的算法。实际应用可以在kafka和netty等项目中找到类似的实现。时间轮的具体实现所谓时间轮,顾名思义,就是一个类似于时钟的结构,即它的主要结构是一个环形数组,如图:一个一个地列出来,链表存储需要执行的项目。对于任务,我们在数组中设置执行间隔,假设我们循环数组的长度为60,每个数组的执行间隔为1s,那么每隔1s执行一次数组下一个元素中的链表元素。如果是这样的话,那么我们将无法处理超过60秒的延迟任务,这显然是不合适的,所以我们会在每个任务中添加一个参数numberoflaps,表示该任务会在几圈后执行。如果我们有一个任务要在150s之后执行,那么他应该在30s的位置,同时圈数应该是2。每次执行链表中的任务,我们都会取出并执行当前圈中需要执行的任务,然后从链表中删除。如果当前圈内没有执行任务,则修改其圈数,将圈数减1。于是一个简单的时间轮就出来了。那么这样的时间轮有什么优缺点呢?先说优点:时间轮的插入相比DelayQueue效率更高,时间复杂度O(1),实现简单明了,任务调度更方便合理。支持分布式,数据放在内存中,不需要持久化支持。服务停机会丢失数据。阵列之间的间隔设置会影响任务的准确性。因为圈数不同的任务会在同一个链表中,所以每个数组都会被执行。需要为元素遍历所有链表数据,效率会很低。时间轮算法进阶优化版刚才提到了时间轮算法的一些缺点,那么有没有办法优化呢?在这里我将介绍时间轮的优化版本。前面我们提到,不同圈数的任务会在同一个链表中反复遍历,影响效率。这种情况下,我们可以优化如下:对时间轮进行分层从图中可以看出我们采用了多层次的设计,上图分为三层,每层60个格子,间隔在第一个轮盘是1小时,我们的数据每次都插入到这个轮盘中,这个轮盘每经过一个小时后,到达下一个刻度时,将里面的元素全部取出,放入到第二个轮盘中,象征着分钟根据延迟时间,依此类推。这种实现方式的好处可以说是显而易见的:第一,避免了时间跨度大时空间的浪费,大大节省了操作的时间。时间轮算法的应用以前可能没有听说过,但是在各个地方都有很大的作用。linux定时器的实现中有一个时间轮。同样,如果你是一个喜欢阅读源码的读者,你也可能会在kafka和netty中找到他的实现。Kafka中应用了时间轮算法Kafka,其实现与上面提到的高级版时间轮没有太大区别,除了一点:Kafka内部实现的时间轮应用到了DelayQueue。@nonthreadsafeprivate[timer]classTimingWheel(tickMs:Long,wheelSize:Int,startMs:Long,taskCounter:AtomicInteger,queue:DelayQueue[TimerTaskList]){private[this]valinterval=tickMs*wheelSizeprivate[this]valbuckets=Array.tabulate[TimerTaskList](wheelSize){_=>newTimerTaskList(taskCounter)}private[this]varcurrentTime=startMs-(startMs%tickMs)@volatileprivate[this]varoverflowWheel:TimingWheel=null私人[this]defaddOverflowWheel():Unit={synchronized{if(overflowWheel==null){overflowWheel=newTimingWheel(tickMs=interval,wheelSize=wheelSize,startMs=currentTime,taskCounter=taskCounter,queue)}}}defadd(timerTaskEntry:TimerTaskEntry):Boolean={valexpiration=timerTaskEntry.expirationMsif(timerTaskEntry.cancelled){false}elseif(expiration=currentTime+tickMs){currentTime=timeMs-(timeMs%tickMs)if(overflowWheel!=null)overflowWheel.advanceClock(currentTime)}}}以上是kafka的内部实现(使用的语言是scala),我们可以看到实现非常简洁,使用了DelayQueue。刚才我们已经讨论了DelayQueue的优点和缺点。查看源码后,我们已经可以有一个大概的结论:kafka的时间轮中DelayQueue的作用是负责推进任务,目的是防止时间轮中任务相对稀疏造成的“空推进”。DelayQueue的触发机制可以很好的避免这种情况。同时由于DelayQueue的插入效率低,所以只用于底层推进。任务的插入由时间轮操作。两者的配置可以达到效率和资源的平衡。Nettynetty在HashedWheelTimer内部也有时间轮的实现。HashedWheelTimer的实现要比Kafka的内部实现复杂的多。与Kafka不同的是,它内部的推进并不依赖DelayQueue,而是自己实现了一套。源代码太长。有兴趣的读者可以自己看看。总结时间轮说了这么多,可见它的效率是出类拔萃的,但是仍然存在这样一个问题:不支持分布式。当我们的业务非常复杂,需要分布式时,时间轮似乎无法如愿以偿。那么此时延迟队列选择什么比较好呢?我们可以尝试使用第三方工具redis延迟队列。其实说到延迟,如果我们经常使用redis,就会想起redis是有过期机制的,那么我们是不是可以利用这个机制来实现一个延迟队列呢??Redis有自己的key过期机制,可以设置过期后的回调方式。基于这个特性,我们可以很容易的完成一个延迟队列。当有任务进来时,设置时间并配置过期回调方法。除了使用redis的过期机制,我们还可以使用它自带的zset来实现延迟队列。zset支持高性能排序,所以我们可以在任务进来的时候使用时间戳作为排序的依据,从而有条不紊地安排任务的执行,也可以实现延迟队列。zset实现延迟队列的好处:支持高性能排序,redis本身的高可用和高性能,持久化mq延迟队列rocketmq延迟消息rocketmq天然支持延迟消息,它的延迟消息分为18个级别,每个级别对应不同的延迟时间。那么他的理由是什么?rocketmq的broker收到消息后会把消息写入commitlog,判断消息是否是延迟消息(即delay属性是否大于0),如果是则判断是延迟消息,那么他不会马上写,而是通过转发的方式把消息放到对应的延迟topic中(18个延迟级别对应18个topic)。rocketmq会有一个定时任务进行轮询,如果任务的延迟时间到了,就会发送到指定的topic。这种设计比较简单粗暴,但是缺点也很明显:延迟是固定的,如果想要的延迟超过18级,就没有办法做到精确延迟,而且队列的堆积也会导致执行错误。Rocketmq的精准延时消息Rocketmq本身不支持精准延时,但是其商业版ons支持。不过在rocketmq社区有相应的解决方案。该方案是借助时间轮算法实现的,感兴趣的朋友可以去社区看看。(社区里一些未合并的PR是很好的实现参考。)综上所述,延迟队列的实现有上万种,但是如果要在生产中大规模使用,时间轮算法是绕不开的在多数情况下。改进后的时间轮算法,可以做到精准延时、持久化、高性能、高可用,堪称完美。不过话又说回来,难道其他的延迟手段就没用了吗?其实不是的,所有的方法都需要配合自己的使用场景。如果你正在用非常少量的数据进行轮询,那么周期性地轮询数据库可能是最好的解决方案,而不是无意识地引入复杂的延迟队列。如果是单机任务,那么jdk的延迟队列也是不错的选择。本文介绍的延迟队列只是给大家展示一下它们的原理和优缺点,具体使用需要结合自己的业务场景。