本文转载自微信公众号“源码兴趣圈”,作者malt。转载本文请联系源码兴趣圈公众号。什么是队列队列是一种特殊的先进先出线性表,简称FIFO。特殊之处在于只允许在一端插入,另一端插入操作被删除的一端称为队尾,进行删除操作的一端称为队头团队。当队列中没有元素时,称为空队列。队列在编程中用的比较多,包括一些中间件的底层数据结构就是一个队列(基本的内容就不过多解释了)。什么是阻塞队列就是队列,阻塞队列就是队列。阻塞队列是什么鬼,就是在队列的基础上增加了两个额外操作的队列,分别是:支持阻塞插入方式:当队列容量满时,插入元素的线程会被阻塞,直到队列容量过剩,支持阻塞移除方法:当队列中没有元素时,移除元素的线程会被阻塞,直到队列中有可以移除的元素为止因为LBQ是一篇源码分析文章,建议小伙伴们在上面观看PC端。当然,如果屏幕足够大,我没说~阻塞队列继承关系。阻塞队列是一个抽象的名字。阻塞队列的底层数据结构可以是数组,也可以是单向链表,也可以是双向链表……LBQ是一个由单向链表组成的队列。下图是LBQ的上下层继承关系图。从图中可以看出,LBQ实现了BlockingQueue接口,BlockingQueue实现了Queue接口。Queue接口分析我们先以自上而下的方式分析一波Queue,接口中定义了哪些方法//如果队列容量允许,立即将元素插入队列,成功后返回//🌟如果是队列容量已满,抛出异常booleanadd(Ee);//如果队列容量允许,立即将元素插入队列,成功后返回//🌟如果队列容量已满,则返回false//使用有界队列时,当offer大于add方法时booleanoffer(Ee);//检索并删除队列的头节点,返回值为被删除的队列头节点//🌟如果队列为空,则抛出异常eremove();//检索并删除队列的头节点,返回值为删除的队列头节点//🌟如果队列为空,则返回nullEpoll();//检查但不删除队列头节点//🌟如果队列为空,则抛出异常Eelement();//检查但不删除队列头节点//🌟如果队列为空,则返回nullEpeek();总结一下Queue接口的方法,分为三类:向队列容器中添加元素:add、offer从队列容器中移除元素:remove、poll查询队列头节点是否为空:element、peek来自程序接口API的健壮性,可以分为两类:健壮API:offer,poll,peek非健壮API:add,remove,element接口API完全没有健壮性。这里所说的健壮边界指的是使用非健壮的API接口,程序出错的概率较高。因此,我们更应该关注如何捕获可能的异常以及相应的异常。处理BlockingQueue接口分析BlockingQueue接口继承于Queue接口,所以一些相同语义的API接口不做解释//将指定的元素插入队列,如果队列已满,等待直到有可用空间;通过抛出异常,你可以Interruptwhilewaiting//🌟相对于Queue接口,是一个新的方法voidput(Ee)throwsInterruptedException;//将指定的元素插入到队列中,如果队列已满,则等待指定的时间空出Out空间;通过抛出异常,可以在等待时中断//🌟相当于offer(Ee)的扩展方法booleanoffer(Ee,longtimeout,TimeUnitunit)throwsInterruptedException;//检索并移除这个队列的头节点,如果需要,等待元素可用;通过throws异常,可以在等待时中断Etake()throwsInterruptedException;//获取并删除这个队列的头部,如果需要让元素可用,等待指定的等待时间;根据throws异常,可以等待时中断//🌟相当于poll()的扩展方法epoll(longtimeout,TimeUnitunit)throwsInterruptedException;//返回队列的剩余容量,如果是无界队列则返回Integer。MAX_VALUEintremainingCapacity();//如果这个队列包含指定的元素,则返回true从此队列中删除最多给定数量的可用元素,并将它们添加到给定的集合中intdrainTo(Collectionc,intmaxElements);可以看到BlockingQueue接口中的个性化方法相当多本文中的猪脚LBQ是从BlockingQueue接口实现的源码分析变量分析LBQ为了保证并发的增删等操作,JUC包下的ReentrantLock和Condition控件用于控制需要锁privatefinalReentrantLocktakeLockbeheldby//take,poll等移除操作=newReentrantLock();//当队列没有数据时,删除元素线程被挂起privatefinalConditionnotEmpty=takeLock.newCondition();//put,offer等新操作需要持有锁privatefinalReentrantLockputLock=newReentrantLock();//当队列为空时,挂起添加元素线程。privatefinalConditionnotFull=putLock.newCondition();ArrayBlockingQueue(ABQ)为什么内部元素计数字段使用int类型计数变量?你不担心并发吗?因为ABQ内部使用了一个锁来控制进出队列的操作。同时,只有一个线程会执行count变量来修改LBQ使用的两个锁,所以两个线程会同时修改count值。如果像ABQ一样使用int类型,两个进程同时执行修改count的个数会导致数据不准确,所以需要使用concurrentatomic类修改。如果不明白为什么要用原子类来统计,戳这里,从结构体入手,了解它是由哪些元素组成的。每个元素是做什么用的。如果数据结构不错,你应该能猜出绑定的容量。如果是unbounded,则为Integer.MAX_VALUEprivatefinalintcapacity;//当前队列的元素个数privatefinalAtomicIntegercount=newAtomicInteger();//当前队列的头节点transientNodehead;//当前队列的尾节点队列privatetransientNode最后;看到head和last元素,是不是对LBQ有了一个大概的原型?这时候还有一个结构NodestaticclassNode{//节点存储的元素Eitem;//当前节点的后继节点LinkedBlockingQueue.Nodenext;Node(Ex){item=x;}}构造函数分析这里画个图就明白LBQ默认的构造方法是如何初始化队列的publicLinkedBlockingQueue(){this(Integer.MAX_VALUE);}publicLinkedBlockingQueue(intcapacity){if(capacity<=0)thrownewIllegalArgumentException();this.capacity=容量;last=head=newNode(null);}可以看到默认的构造方法会把capacity设置为Integer.MAX_VALUE,也就是无界队列里面常说的其实是调用带参数的重载构造,方法里面设置了capacity,item是初始化为空对于Node节点,头最后一个节点与一个初始化的队列相关联。head最后一个节点指向的Node中的item和next都是空的。如果此时加入一条记录,队列会发生什么变化?队列会发生什么?封装为Node加入队列,putenqueue方法的语义,如果队列元素已满,阻塞当前插入线程,直到队列有空位被唤醒publicvoidput(Ee)throwsInterruptedException{if(e==null)thrownewNullPointerException();intc=-1;Nodenode=newNode(e);//将要添加的数据封装为NodefinalReentrantLockputLock=this.putLock;//获取添加操作的锁finalAtomicIntegercount=this.count;//获取队列实际元素个数putLock.lockInterruptibly();//操作可以中断并锁定APItry{while(count.get()==capacity){//如果队列元素个数==队列的最大值,将线程放入条件队列阻塞notFull.await();}enqueue(node);//执行入队过程c=count.getAndIncrement();//获取值并增加,例如:count=0,执行后结果值count+1=2,return0if(c+1node){last=last.next=node;}代码比较简单。首先将node赋值给当前最后一个节点的next属性,然后将最后一个节点指向node,完成节点入队操作。假设LBQ的范式是String,先插入元素a,队列如下图所示:What?一个数据不满足?没有什么是别人解决不了的。元素b入队如下:Queue入队如上图,iteminhead永远为空,nextinlast永远为空LBQ#offer也是一种入队方式,区别在于:如果队列元素已满,则直接返回false,不会阻塞线程节点出队。LBQ#take是出列方法。如果队列中的元素为空,则当前出队线程会被阻塞,直到有publicEtake()throwsInterruptedException{Ex;intc=-1;finalAtomicIntegercount=this.count;//获取当前队列中元素的实际个数finalReentrantLocktakeLock=this.takeLtakeLocock;//获取takeLock锁实例takeLock.lockInterruptibly();//获取takeLock锁,如果获取不到锁可以中断try{while(count.get()==0){//如果当前队列元素==0,当前获取节点线程加入等待队列notEmpty.await();}x=dequeue();//当前队列元素>0,执行头节点出队操作c=count.getAndDecrement();//获取当前队列元素个数,设置个数-1if(c>1)//当队列中还有元素时,唤醒下一个消费者线程消费notEmpty.signal();}finally{takeLock.unlock();//释放锁}if(c==capacity)//移除之前element队列已满,唤醒生产者线程添加元素signalNotFull();returnx;//返回头节点}dequeue操作的整体流程清晰明了,类似于enqueue操作的执行过程当队列满时,当前dequeue线程会阻塞在队列中如果有元素可以消费的队列,执行节点出队操作。如果节点出队后队列中有出队元素,则唤醒等待队列中的出队线程。如果在删除元素之前队列已满,则唤醒生产者线程。添加元素LBQ#dequeue出队操作比入队操作稍微复杂privateEdequeue(){Nodeh=head;//获取队列头节点Nodefirst=h.next;//获取头部node后继节点h.next=h;//helpGChead=first;//相当于把头节点的后继节点设置为新的头节点Ex=first.item;//获取新的头节点itemfirst。item=null;//因为头节点item为空,所以item被赋值为nullreturnx;}在dequeue过程中,原头节点会指向自己。这样做是为了帮助GC回收当前节点,然后将原来的head的下一个节点设置为新的head。下图是一个完整的Dequeue流程dequeue流程图如上,流程中没有特别注意的地方另一种LBQ#polldequeue方法,如果队列中元素为空,则返回null,不会像take一样阻塞节点查询,因为元素查找方法是在父类AbstractQueue中实现的,而LBQ中只实现了peek方法,node查询使用peek表示peek和element都获取队列头节点的数据。两者的区别是前者队列为空则返回null,后者抛出相关异常publicEpeek(){if(count.get()==0)//队列为空returnnullreturnnull;finalReentrantLocktakeLock=this.takeLock;takeLock.lock();//获取锁try{LinkedBlockingQueue.Nodefirst=head.next;//获取头节点的下一个后继节点if(first==null)//如果后继节点为空,返回null,否则返回后继节点的itemreturnnull;elsereturnfirst.item;}finally{takeLock.unlock();//unlock}}看到这里,可以得出结论,虽然头节点item始终为null,但是peek方法获取的是head.nextnodeitem节点删除操作需要获取两把锁,所以获取节点、出队节点、入队节点等操作会被阻塞publicbooleanremove(Objecto){if(o==空)返回假;fullyLock();//获取两个锁try{//从头节点开始,循环遍历队列for(Nodetrail=head,p=trail.next;p!=null;trail=p,p=p.next){if(o.equals(p.item)){//item==o执行删除操作unlink(p,trail);//删除操作returntrue;}}returnfalse;}finally{fullyUnlock();//释放两把锁}}链表删除操作,一般来说,在循环中一个一个遍历,而这种遍历时间复杂度是O(n),最坏的情况是遍历链表中的所有节点lookLBQ#remove中的unlink如何取消与节点关联的voidunlink(Nodep,Nodetrail){p.item=null;//第一次遍历,trail为头节点,p为头节点的后继节点trail.next=p。next;//设置头节点的后继指针为p节点的后继指针if(last==p)//如果p==lastsetlast==traillast=trail;//如果之前队列满删除元素,删除后有空闲空间,唤醒生产线程if(count.getAndDecrement()==capacity)notFull.signal();}remove方法和take方法类似,如果元素remove方法的是头节点,效果和Take一致,头节点元素出队。为了更好的理解,我们删掉中间的元素,画两张图来理解原因。代码如下:publicstaticvoidmain(String[]args){BlockingQueueblockingQueue=newLinkedBlockingQueue();blockingQueue。offer("a");blockingQueue.offer("b");blockingQueue.offer("c");//删除队列中间元素blockingQueue.remove("b");}执行三个offer操作上述代码,队列结构图如下:删除元素b的操作执行后,队列结构如下:如果p节点是最后一个尾节点,则将p的前驱节点设置为新的尾节点。删除操作大致是这样的。上面提到的应用场景是阻塞队列在大量的业务场景中被使用。这里有两个实际的例子,可以帮助你理解生产者消费者模型。生产者消费者模型是典型的多线程并发写入模式,生产者和消费者之间需要一个容器来解决强耦合关系。生产者将数据放入容器中,消费者消费容器中的数据。生产者消费者在Object类中实现wait、notify、notifyAllLock的方式有很多种Condition中的await、signal、signalAllBlockingQueue阻塞队列实现生产者消费者模型代码如下:@Slf4jpublicclassBlockingQueueTest{privatestaticfinalintMAX_NUM=10;privatestaticfinalBlockingQueueQ??UEUE=newLinkedBlockingQueue<>(MAX_NUM);publicvoidproduce(Stringstr){try{QUEUE.(str);log.info("🔥🔥🔥将元素放入队列::{},队列元素数量::{}",str,QUEUE.size());}catch(InterruptedExceptionie){//ignore}}publicStringconsume(){Stringstr=null;try{str=QUEUE.take();log.info("🔥🔥🔥队列移除元素::{},队列元素数量::{}",str,QUEUE.size());}catch(InterruptedExceptionie){//ignore}returnstr;}publicstaticvoidmain(String[]args){BlockingQueueTestqueueTest=newBlockingQueueTest();for(inti=0;i<5;i++){intfinalI=i;newThread(()->{Stringstr="element-";while(true){queueTest.produce(str+finalI);}}).start();}for(inti=0;i<5;i++){newThread(()->{while(true){queueTest.consume();}}).start();}}}线程池应用线程池中阻塞队列的具体应用属于生产者-消费者的实际场景。线程池在Java应用中的重要性不言而喻,这里简单介绍一下线程池的运行原理。线程池中的线程数小于核心线程数。当线程池中的线程数大于等于核心线程数时,将任务存入阻塞队列。线程池线程数大于等于核心线程数,阻塞。队列已满,线程池创建非核心线程。重点在第二点。当线程池的核心线程在运行任务时,会将任务存放在阻塞队列中。线程池源码如下:if(isRunning(c)&&workQueue.offer(command)){}看到使用的offer方法,通过上面的描述,如果阻塞队列满了,返回false。然后什么时候消费队列中的元素。涉及到线程池中线程执行过程的原理,这里简单介绍一下线程池中线程执行任务的两种方式,一种是创建核心线程时自带的任务,另一种是从阻塞队列中获取。核心线程在执行任务时,其实和非核心线程没有区别。线程池使用两个API获取阻塞队列任务,分别是poll和takeworkQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();问:为什么要使用两个API?应用程序接口?不好吗?A:Take是维护线程池中核心线程的重要手段。如果获取不到任务,线程就会被挂起,等待下一个任务的加入。至于带时间的pool,就是回收非核心线程准备好的结论到这里LBQ阻塞队列已经讲解完毕,总结一下文中描述的LBQ的基本特点。LBQ是一个基于链表的阻塞队列,可以并发读写。LBQ队列容量可以自己设置。如果不设置默认的Integer最大值,又称为无界队列,结合源码,详细解释了LBQ的入队、出队、查询、删除等操作。LBQ只是一个介绍。希望大家能通过文章掌握阻塞队列的核心思想,然后再去看看。巩固其他实现代码的知识。现在小伙伴们都知道LBQ是通过锁机制实现并发安全控制的。想一想不使用锁是否可以实现,如何实现?