面试官也不得不一口气接受延迟队列的6种实现方式。原计划是5月1日写两篇文章,看一本技术书。各种试探,连电脑都没开,完美计划落空。所以在这里我看到了和老板的差距,比你差的人比你努力,很难跟上,真的让我很惭愧。知耻而后勇。这并不强迫我重新学习。就个人而言,我喜欢一些实用的东西。我可以学习知识,使技术落到实处。最好做个demo。我不知道要分享什么主题。幸好项目最近在紧急招人,有幸担任面试官,整理出一道面试题分享给大家:“延迟队列如何实现?”。下面将介绍延迟队列的多种实现思路,文末提供几种实现方法的github地址。其实这两种方法都没有绝对的好坏,只是看在什么业务场景下使用。没有最好的技术,只有最适合的技术。一、延迟队列的应用什么是延迟队列?顾名思义:首先要有队列的特性,然后给它加上延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费队列将被消耗。延迟队列在项目中被广泛使用,尤其是电商平台:1、下单成功后,如果30分钟内没有付款,订单将自动取消。用户发送文本消息。3.如果订单一直处于某种未完成状态,将及时关闭订单并退回库存。4、新淘宝商户一个月内未上传商品信息,店铺将被冻结。...以上所有场景都可以通过应用延迟队列来解决。2.延迟队列的实现我个人一直坚持的观点:工作中可以用JDK自带的API实现的功能,所以不要轻易自己重新造轮子,或者引入第三方中间件.一方面,自己封装容易出问题(老板除外),调试和验证会产生很多不必要的工作量;另一方面,一旦接入第三方中间件,系统复杂度将成倍增加,维护成本也会增加。也大大增加。1、DelayQueue延迟队列JDK提供了一套实现延迟队列的API,位于java.util.concurrent包下的DelayQueue中。DelayQueue是一个BlockingQueue(无界阻塞)队列,本质上封装了一个PriorityQueue(优先级队列)。PriorityQueue内部使用了一个完整的二叉堆(不知道的自行了解)对队列元素进行排序。我们添加到DelayQueue队列中选择元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素先放在队列的头部。队列中的元素只有在达到Delay时间后才允许被取出队列。基本数据类型或自定义实体类都可以放在队列中。存储基本数据类型时,优先队列中的元素默认按升序排列。对于自定义的实体类,我们需要根据类的属性值进行比较计算。我们先简单实现一下看看效果,在DelayQueue中添加三个订单,分别设置订单在当前时间5秒、10秒、15秒后取消。要实现DelayQueue延迟队列,队列中的元素必须实现Delayed接口。该接口只有一个getDelay方法,用于设置延迟时间。Order类中的compareTo方法负责对队列中的元素进行排序。publicclassOrderimplementsDelayed{/***延迟时间*/@JsonFormat(locale="zh",timezone="GMT+8",pattern="yyyy-MM-ddHH:mm:ss")privatelongtime;Stringname;publicOrder(Stringname,longtime,TimeUnitunit){this.name=name;this.time=System.currentTimeMillis()+(time>0?unit.toMillis(time):0);}@OverridepubliclonggetDelay(TimeUnitunit){returntime-System.currentTimeMillis();}@OverridepublicintcompareTo(Delayedo){OrderOrder=(Order)o;longdiff=this.time-Order.time;if(diff<=0){return-1;}else{return1;}}}DelayQueue的put方法为一个线程安全的,因为put方法内部使用了ReentrantLock锁来进行线程同步。DelayQueue还提供了poll()和take()两种出队方法,poll()是非阻塞获取,没有过期元素直接返回null;take()是阻塞方式,没有过期的元素线程会等待。publicclassDelayQueueDemo{publicstaticvoidmain(String[]args)throwsInterruptedException{OrderOrder1=newOrder("Order1",5,TimeUnit.SECONDS);OrderOrder2=newOrder("Order2",10,TimeUnit.SECONDS);OrderOrder3=newOrder("Order3",15,TimeUnit.SECONDS);DelayQueuedelayQueue=newDelayQueue<>();delayQueue.put(Order1);delayQueue.put(Order2);delayQueue.put(Order3);System.out.println("订单延迟队列开始时间:"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss")));while(delayQueue.size()!=0){/***获取队列头第一个元素是否过期*/Ordertask=delayQueue.poll();if(task!=null){System.out.format("订单:{%s}被取消,取消时间:{%s}\n",任务.name,LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss")));}Thread.sleep(1000);}}}以上只是一个简单的入队和出队的实现,对于入队的操作,实际开发中会有专门的线程,负责入队和入队消息的消费。执行后结果如下。Order1、Order2、Order3分别在5秒、10秒、15秒后执行。至此,延迟队列已经用DelayQueue实现了。订单延迟排队开始时间:2020-05-0614:59:09订单:{Order1}被取消,取消时间:{2020-05-0614:59:14}订单:{Order2}被取消,取消时间:{2020-05-0614:59:19}订单:{Order3}被取消,取消时间:{2020-05-0614:59:24}2.Quartz定时任务Quartz是一个非常经典的任务调度框架,也是在Redis和RabbitMQ应用不广泛的时候,超时后取消订单不付款的功能是通过定时任务来实现的。定时任务具有一定的周期性。很多订单可能已经超时但还没有到达触发执行时间点,这会导致订单处理不够及时。引入quartz框架依赖包org.springframework.bootspring-boot-starter-quartz在启动类中使用@EnableScheduling注解启用定时任务功能。@EnableScheduling@SpringBootApplicationpublicclassDelayqueueApplication{publicstaticvoidmain(String[]args){SpringApplication.run(DelayqueueApplication.class,args);}}写一个定时任务,每5秒执行一次。@ComponentpublicclassQuartzDemo{//每五秒一次@Scheduled(cron="0/5****?")publicvoidprocess(){System.out.println("我是定时任务!");}}Redis数据结构zset也可以实现延迟队列的效果,主要是利用它的score属性,redis利用score对set中的成员从小到大进行排序。通过zadd命令向队列delayqueue中添加元素,并设置score值表示元素的过期时间;在delayqueue中添加三个order1、order2、order3,分别在10秒、20秒、30秒后过期。zadddelayqueue3order3消费者轮询队列delayqueue,对元素进行排序,比较最小时间和当前时间。如果小于当前时间,则表示密钥已过期并被移除。/***消费消息*/publicvoidpollOrderQueue(){while(true){Setset=jedis.zrangeWithScores(DELAY_QUEUE,0,0);Stringvalue=((Tuple)set.toArray()[0]).getElement();intscore=(int)((Tuple)set.toArray()[0]).getScore();Calendarcal=Calendar.getInstance();intnowSecond=(int)(cal.getTimeInMillis()/1000);if(nowSecond>=score){jedis.zrem(DELAY_QUEUE,value);System.out.println(sdf.format(newDate())+"removedkey:"+value);}if(jedis.zcard(DELAY_QUEUE)<=0){System.out.println(sdf.format(newDate())+"zsetempty");return;}Thread.sleep(1000);}}我们看到执行结果符合预期2020-05-0713:24:09addfinished.2020-05-0713:24:19removedkey:order12020-05-0713:24:29removedkey:order22020-05-0713:24:39removedkey:order32020-05-0713:24:39zsetempty4,Redis过期回调Rediskey回调事件也可以达到延迟排队的效果。简单的说,我们开启监听key是否过期的事件。一旦密钥过期,将触发回调事件。修改redis.conf文件开启notify-keyspace-eventsExnotify-keyspace-eventsExRedis监听配置,注入BeanRedisMessageListenerContainer@ConfigurationpublicclassRedisListenerConfig{@BeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory){RedisMessageListenerContainercontainer=newRedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);returncontainer;}}编写Redis过期回调监听方法必须继承KeyExpirationEventMessageListener,有点类似于MQ消息监听。@ComponentpublicclassRedisKeyExpirationListenerextendsKeyExpirationEventMessageListener{publicRedisKeyExpirationListener(RedisMessageListenerContainerlistenerContainer){super(listenerContainer);}@OverridepublicvoidonMessage(Messagemessage,byte[]pattern){StringexpiredKey=message+expiredl.toString();System.out");}}此时代码写好了,很简单,接下来测试一下效果,在redis-cli客户端添加key,设置过期时间3s,setxiaofu123ex3在控制台成功监听到过期key。监控到的过期key为:xiaofu5,RabbitMQ延迟队列将RabbitMQ作为延迟队列是常见的做法,但实际上RabbitMQ本身并不直接支持延迟队列功能,而是通过RabbitMQ消息队列的TTL和这两个DXL的属性是间接实现的。我们先来认识一下TTL和DXL这两个概念:TimeToLive(TTL):TTL,顾名思义:指的是消息的存活时间。RabbitMQ可以通过x-message-tt参数设置指定的Queue(队列)和Message(消息)上面消息的存活时间,它的值是一个非负整数,单位是微秒。RabbitMQ可以从队列和消息本身两个维度设置消息过期时间。如果设置了队列过期时间,则队列中的所有消息都具有相同的过期时间。设置消息过期时间,对队列中某条消息设置过期时间。每条消息的TTL可以不同。如果同时设置队列和队列中消息的TTL,则TTL值为两者中较小的值。队列中的消息在队列中保存一段时间,一旦超过TTL过期时间,就变成DeadLetter(死信)。DeadLetterExchanges(DLX)DLX是死信交换,死信队列绑定死信交换。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange和x-dead-letter-routing-key(可选)。一旦队列中出现了DeadLetter(死信),按照这两个参数就可以将消息重新路由到另一个Exchange(交换),从而可以再次消费该消息。x-dead-letter-exchange:队列中出现DeadLetter后,重新路由转发DeadLetter到指定的exchange(交换)。x-dead-letter-routing-key:指定发送的routing-key,一般是要转发的队列。队列中出现DeadLetter的情况包括:消息或队列的TTL已过期,队列已达到最大长度。消息被消费者拒绝(basic.reject或basic.nack)。下面结合一张图,看看如何实现关闭超过30分钟未支付订单的功能。我们将订单消息A0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟。当订单消息A0001在30分钟后变为DeadLetter(死信)时,延迟队列检测到是否存在死信,通过配置x-dead-letter-exchange,将死信重新转发给订单队列即可正常消费,直接监听订单队列处理订单逻辑。发送消息时指定消息延迟时间publicvoidsend(StringdelayTimes){amqpTemplate.convertAndSend("order.pay.exchange","order.pay.queue","大家好,我是延迟数据",message->{//设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));returnmessage;});}}设置延迟队列有死信后的转发规则/***延迟队列*/@Bean(name="="order.delay.queue")publicQueuegetMessageQueue(){returnQueueBuilder.durable(RabbitConstant.DEAD_LETTER_QUEUE)//配置过期后转发的exchange.withArgument("x-dead-letter-exchange","order.close.exchange")//配置过期后转发的routingkey.withArgument("x-dead-letter-routing-key","order.close.queue").build();}6.前面几个延迟队列的实现时间轮的方法比较简单易懂,时间轮算法有点抽象。kafka和netty都是基于时间轮算法实现延迟队列的。以下部分主要使用Netty的延迟队列来讲解时间轮的原理。我们先来看一张时间轮的示意图,解释一下时间轮轮子的一些基本概念:时间轮,图中的圆盘可以看作是时钟的刻度。例如,如果一个回合的长度是24秒,刻度数是8,那么每个刻度代表3秒。那么时间精度为3秒。duration/scale值越大,精度越高。添加定时延时任务A时,如果延时25秒后执行,但时间轮一轮的时长只有24秒,则得到轮数和对应的轮数根据时间轮的长度和刻度。指针位置index表示任务A会绕一圈,指向grid0,此时时间轮会记录任务的round和index信息。当round=0,index=0时,指针指向grid0。任务A不会执行,因为round=0不符合要求。所以每个格子代表一些时间,比如1秒和25秒会指向0格子,任务放在每个格子对应的链表中,有点类似HashMap的数据。Netty主要使用HashedWheelTimer来构建延迟队列。HashedWheelTimer的底层数据结构仍然使用DelayedQueue,但是它是使用时间轮算法实现的。接下来,我们使用Netty来简单实现延迟队列。HashedWheelTimer的构造函数有很多,并解释每个参数的含义。ThreadFactory:表示用来生成工作线程,一般使用线程池;tickDuration和unit:每个格子的时间间隔,默认100ms;ticksPerWheel:一个圆有多少个格子,默认为512,如果输入值不是2平方的N倍,则调整为大于等于该参数的2的N次方的值,有利于优化哈希值的计算。publicHashedWheelTimer(ThreadFactorythreadFactory,longtickDuration,TimeUnitunit,intticksPerWheel){this(threadFactory,tickDuration,unit,ticksPerWheel,true);}TimerTask:定时任务的实现接口,其中run方法包装了定时任务的逻辑。Timeout:定时任务提交给Timer后返回的句柄。通过这个句柄可以在外部取消定时任务,对定时任务的状态进行一些基本的判断。Timer:是HashedWheelTimer实现的父接口,只定义了如何提交计时任务,如何停止整个计时机制。publicclassNettyDelayQueue{publicstaticvoidmain(String[]args){finalTimertimer=newHashedWheelTimer(Executors.defaultThreadFactory(),5,TimeUnit.SECONDS,2);//定时任务TimerTasktask1=newTimerTask(){publicvoidrun(Timeouttimeout)throwsException{System.out.println("order15s后执行");timer.newTimeout(this,5,TimeUnit.SECONDS);//最后再注册}};timer.newTimeout(task1,5,TimeUnit.SECONDS);TimerTasktask2=newTimerTask(){publicvoidrun(Timeouttimeout)throwsException{System.out.println("order210s后执行");timer.newTimeout(this,10,TimeUnit.SECONDS);//最后重新注册}};timer.newTimeout(task2,10,TimeUnit.SECONDS);//延时任务timer.newTimeout(newTimerTask(){publicvoidrun(Timeouttimeout)throwsException{System.out.println("order315s后执行一次");}},15,TimeUnit.SECONDS);}}从执行结果看,order3和order3的延迟任务只执行一次,而order2和order1是定时任务,根据不同的cy重复执行克莱斯。order15s之后,执行order210s,再执行order315s,再执行order15s,再执行order210s,再执行summary为了方便大家理解,上面的代码写得比较简单粗暴。几种实现方式的demo已经提交到github地址:https://github.com/chengxy-nds/delayqueue,感兴趣的朋友可以下载运行。好久没写这篇文章了,写作不比上班轻松。查了资料反复验证了demo的可行性,搭建了各种RabbitMQ和Redis环境。我只想说,我太难了!可能写得还不够完善如有错误或不明之处,欢迎指正!!!