当前位置: 首页 > 后端技术 > Java

BlockingQueue-LinkedBlockingQueue

时间:2023-04-02 01:51:21 Java

LinkedBlockingQueue是一个阻塞队列,可以是有界的也可以是无界的,队列以FIFO(先进先出)的方式访问。head是队列的头节点,也就是入队时间最长的节点,tail是队列的尾节点,也就是入队时间最短的节点。节点从尾部加入队列并从头部出队。LinkedBlockingQueue#主要属性节点:通过内部类Node定义节点:staticclassNode{Eitem;下一个节点;节点(Ex){item=x;}}很简单,主要有两个属性:next:下一个节点。item:节点中包含的数据。capacity:队列容量,无界队列取值为Integer.MAX_VALUE。count:当前队列中的节点数,类型为AtomicInteger。头:头节点。last:最后一个节点。takeLock:ReentrantLock,出队锁,即从队列中获取数据的锁。notEmpty:takeLock的Condition,辅助排队线程获取数据。putLock:ReentrantLock,enqueuelock,向队列添加数据的锁。notFull:putLock的条件,辅助加入队列的线程排队。构造函数无参数构造函数创建一个无界空队列。publicLinkedBlockingQueue(){这个(Integer.MAX_VALUE);}capacity参数capacity的构造方法创建一个有界空队列publicLinkedBlockingQueue(intcapacity){if(capacity<=0)thrownewIllegalArgumentException();this.capacity=容量;last=head=newNode(null);}集合参数构造函数创建一个无界队列,并将参数集合初始化到队列中:publicLinkedBlockingQueue(Collectionc){this(Integer.MAX_VALUE);finalReentrantLockputLock=this.putLock;putLock.lock();//从未竞争过,但对于可见性是必要的try{intn=0;for(Ee:c){if(e==null)thrownewNullPointerException();if(n==capacity)thrownewIllegalStateException("队列已满");排队(新节点(e));++n;}count.set(n);}最后{putLock.开锁();}}需要注意创建的队列必然会包含一个dummyheadnode,所以在离开queue时,headnode也必须跳过put方法。元素加入队列的put方法,代码比较简单:publicvoidput(Ee)throwsInterruptedException{if(e==null)thrownewNullPointerException();//注意:所有put/take/etc中的约定是预设本地var//保持负计数以指示失败,除非设置。intc=-1;节点节点=新节点(e);finalReentrantLockputLock=this.putLock;最终AtomicInteger计数=this.count;putLock.lockInterruptibly();尝试{while(count.get()==capacity){notFull.await();}入队(节点);c=count.getAndIncrement();如果(c+1<容量)notFull.signal();}最后{putLock.unlock();}如果(c==0)signalNotEmpty();}首先创建一个Node,通过putLock.lockInterruptibly()尝试为当前线程获取队列的写锁;如果获取到则立即返回,否则当前线程阻塞等待。拿到写锁后,判断如果当前队列已经达到容量(count.get()==capacity),在notFull上阻塞等待。否则,如果容量未满或其他出队线程(从队列中取数据的线程取数据成功后)释放队列容量并唤醒当前线程,则将当前节点enqueue(node)加入尾部的队列。然后累加当前队列容量计数,判断如果容量未满,则唤醒其他在notFull上等待容量的进程。然后释放putLock锁。最后,如果加入的节点是队列的第一个节点(c==0,一个空队列加入),此时可能有等待取数据的线程阻塞等待,调用signalNotEmpty()唤醒阻塞的线程。take方法的take方法获取数据:publicEtake()throwsInterruptedException{Ex;intc=-1;最终AtomicInteger计数=this.count;最后的ReentrantLocktakeLock=this.takeLock;takeLock.lockInterruptibly();尝试{while(count.get()==0){notEmpty.await();}x=出队();c=count.getAndDecrement();如果(c>1)notEmpty.signal();}最后{takeLock.unlock();}if(c==capacity)signalNotFull();返回x;}先获取takeLock,获取不到就阻塞等待,获取到就立即返回。拿到take锁后,判断当前队列是否为空(count.get()==0),在notEmpty上阻塞等待。否则,如果队列不为空或其他写线程(参考put方法,put完成后唤醒take阻塞线程)写入队列并唤醒当前线程,调用dequeue方法获取数据.然后重新计算(减去)当前队列容量count,判断如果队列不为空,则唤醒其他在notEmpty上等待数据的take进程。然后释放takeLock锁。最后,如果take之前队列满了(c==capacity),此时可能有等待put数据的线程阻塞等待,signalNotFull()唤醒被阻塞的put线程。最后,返回dequeue()方法检索到的数据。dequeue()方法去到head的secondary节点,获取该节点的item并返回,然后将head节点从队列中移除,secondary节点成为head节点,将成为head的secondary节点头节点(此时已经获取到它的数据项)项留空。privateEdequeue(){//断言takeLock.isHeldByCurrentThread();//断言head.item==null;节点h=head;Nodefirst=h.next;h.next=h;//helpGChead=first;Ex=first.item;first.item=null;返回x;从dequeue方法dequeue()中,我们需要明白以下两点:数据是从secondary节点(头节点的下一个节点)获取的,因为头节点是dummynode。头节点从队列中移除。secondary节点被清理成为dummy节点后,成为头节点(勇敢的站在头上装B)。总结起来比SynchronousQueue的源码简单多了。但是,我们只分析了它的两个主要方法。LinkedBlockingQueue还支持其他的收集方式,代码逻辑比较简单。需要注意的是,有些操作如remove和contains需要同时获取putLock和takeLock。其他写法如offer和poll等数据获取方式,代码逻辑类似put和take,很容易理解。只不过阻塞队列LinkedBlockingQueue也提供了非阻塞的方法,比如poll和offer。如果提供时队列已满,或者轮询时队列为空,将立即返回false。阻碍理解代码的地方应该是ReentrantLock和Condition,它们是理解LinkedBlockingQueue源码的前提。非常感谢!PreviousBlockingQueue-基于TransferQueue的SynchronousQueueNextBlockingQueue-ArrayBlockingQueue