当前位置: 首页 > 科技观察

采访集-DelayQueue篇

时间:2023-03-21 21:47:34 科技观察

采访者:好久不见,我们上次讲完了PriorityBlockingQueue,今天我们来谈谈与之相关的DelayQueue,好吗?Hydra:我知道你之前肯定给我挖坑了,DelayQueue也是一个无界阻塞队列,但是和我们之前讲过的其他队列不同的是,并不是所有类型的元素都可以放进去,只有实现了Delayed接口的对象才可以被放入队列。延迟的对象有一个过期时间,只有到了这个过期时间才能从队列中取出。面试官:有意思,那么它的使用场景是什么?Hydra:不得不说,由于DelayQueue设计的精巧,使用场景还是挺多的。例如,在电子商务系统中,如果订单在下单后30分钟内未付款,则需要自动取消订单。还有,如果我们缓存了一些数据,希望这些缓存在一定时间后过期,我们也可以使用延迟队列将其从缓存中删除。以电商系统为例,大家可以简单看一下这个过程:面试官:看起来和任务调度差不多。它们之间有什么区别吗?Hydra:任务调度更倾向于定时特性,即在指定的特定时间点或间隔执行特定的任务,而延迟队列则更倾向于在指定的延迟时间后执行任务。与任务调度相比,上述例子中的延迟队列场景都具有高频的特点,用定时任务来实现会显得有些过于繁琐。面试官:好的,白话讲了半天,能不吵就别吵了,我先写个例子。Hydra:好的,前面说了队列中存放的元素必须实现Delayed接口,所以我们先定义这样一个类:.delay=delay;this.expire=System.currentTimeMillis()+delay;}@OverridepubliclonggetDelay(TimeUnitunit){returnunit.convert(this.expire-System.currentTimeMillis(),TimeUnit.MILLISECONDS);}@OverridepublicintcompareTo(Delayedo){return(int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));}}实现Delayed接口的类必须实现以下两个方法:getDelay方法用于计算剩余时间对象的延迟时间,判断对象是否过期,计算方式一般采用过期时间减去当前时间。如果为0或负数,则表示延迟时间已用完,否则表示尚未到期。定义后,向队列中加入5个不同延迟时间的对象,等待从队列中取出:",5000));queue.offer(newTask("task2",1000));queue.offer(newTask("task3",6000));queue.offer(newTask("task4",100));队列。提供(新任务(“任务5”,3000));while(true){Tasktask=queue.take();System.out.println(task);}}运行结果如下,可以看到延迟时间的顺序是从短到长,依次移除元素从队列中。任务{name='task4',delay=100}任务{name='task2',delay=1000}任务{name='task5',delay=3000}任务{name='task1',delay=5000}任务{name='task3',delay=6000}面试官:貌似申请挺简单的,不过今天也不能这么马虎,说说原理吧。Hydra:一开始不是你自己说的吗?今天讲的DelayQueue和前几天讲的PriorityBlockingQueue有关系。DelayQueue的底层是PriorityQueue,和PriorityBlockingQueue没有太大区别,只是在PriorityQueue的基础上增加了锁和条件等待,入队和出队采用二叉堆的逻辑。底层使用这些:privatefinaltransientReentrantLocklock=newReentrantLock();privatefinalPriorityQueueq=newPriorityQueue();privateThreadleader=null;privatefinalConditionavailable=lock.newCondition();面试官:你有点太忽悠我了,这让我敷衍?九头蛇:还没完呢,先看看入队的offer方式吧。其源码如下:available.signal();}returntrue;}finally{lock.unlock();}}DelayQueue每次加入到优先级队列PriorityQueue中,该元素剩余的延迟时间将作为排序因子实现第一个过期元素放在队头,这样后面从队列中取出的元素会最先取出第一个过期元素。二叉堆的构建过程我们上次讲过,不再赘述。向队列中加入5个元素后,二叉堆和队列的结构是这样的:当每一个元素按照二叉堆的顺序插入队列时,会检查堆顶元素是否为该元素刚插入,如果是的话,设置leader线程为空,唤醒阻塞在available上的线程。这里简单介绍一下leaderthread的作用。领导者是等待获取元素的线程。它的作用主要是减少不必要的等待。具体的使用我们后面介绍take方法的时候再说。面试官:别等了,趁热打铁,直接说退队的方法。九头蛇:真的不用担心。在看阻塞方法take之前,得先看看非阻塞poll方法是如何实现的:publicEpoll(){finalReentrantLocklock=this.lock;lock.lock();try{Efirst=q.peek();if(first==null||first.getDelay(NANOSECONDS)>0)returnnull;elsereturnq.poll();}finally{lock.unlock();}}代码很短易懂,加锁后,先检查堆顶元素,如果堆顶元素为空或者没有过期,则直接返回empty,否则返回堆顶元素,然后解锁。面试官:嗯,铺路结束。说说阻塞方法的过程?Hydra:blockingtake方法比上面的要难理解一点。我们只看它的源码:publicEtake()throwsInterruptedException{finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{for(;;){Efirst=q.peek();if(first==null)available。await();else{longdelay=first.getDelay(NANOSECONDS);if(delay<=0)returnq.poll();first=null;//不要保留refwhilewaitingif(leader!=null)available.await();else{ThreadthisThread=Thread.currentThread();leader=thisThread;try{available.awaitNanos(delay);}finally{if(leader==thisThread)leader=null;}}}}}finally{if(leader==null&&q.peek()!=null)available.signal();lock.unlock();}}阻塞过程中的分支条件比较复杂,我们一一来看:首先获取堆顶元素,如果为空,则队列中没有元素,让当前线程阻塞等待available如果为空,则查看其过期时间。如果已经过期,则直接弹出堆顶元素。如果堆顶元素还没有过期,那么检查leader线程是否为空。如果leader线程不为空??,说明还有其他线程在等待获取队列的元素,直接阻塞当前线程。如果leader为空,则将当前线程分配给它,阻塞延迟时间后调用awaitNanos方法自动唤醒。唤醒后如果leader还是当前线程,则置空,重新进入循环,再次判断堆顶元素是否过期。当队列中的元素出队时,如果领导线程为空且堆中有元素,则唤醒阻塞在可用上的其他线程并释放持有的锁。面试官:我注意到一个问题。上面代码中,为什么要设置first=null呢?Hydra:假设有多个线程在执行take方法,当第一个线程进入时,堆顶元素还没有过期,那么leader会指向自己,然后阻塞自己一段时间。如果在此期间有其他线程到达,则会因为leader不为空而阻塞自己。当第一个线程阻塞时,如果成功弹出堆顶元素,first指向的元素应该被gc回收。但是如果还被其他线程持有,就不会被回收,所以设置first为empty可以帮助完成垃圾回收。面试官:我突然有一个不同的问题。调度线程池ScheduledThreadPoolExecutor底层也是用DelayQueue吗?Hydra:这个问题很好,可惜不是。ScheduledThreadPoolExecutor在类中定义了一个DelayedWorkQueue内部类,不直接使用DelayQueue。但是,如果你查看源代码,你会发现它们实现的逻辑基本相同。也是基于二叉堆的浮动、下沉、扩展,也是基于leader、lock、conditionalwaiting等操作。它完成了一次。说白了,看两个班的作者,都是DougLea大师,根本没有太大区别。面试官:好的,今天先到这里,最后可以总结一下吗?Hydra:从整体上理解DelayQueue没有难度。讲到优先队列的时候,难点已经基本搞清楚了。New增加的东西是对leaderthread的操作,利用leaderthread减少不必要的线程等待时间。面试官:今天的面试有点短,总感觉有点意犹未尽。看来下次还得给大家补充一下了。九头蛇:...