当前位置: 首页 > 后端技术 > Java

【JUC】ArrayBlockingQueue的Condition应用

时间:2023-04-02 01:25:33 Java

阻塞队列是生产者和消费者模式的应用;从源码来看,ArrayBlockingQueue本质上是条件1的应用。示例及原理//==1.队列初始化ArrayBlockingQueuequeue=newArrayBlockingQueue(100);//==2.入队线程threadA=newThread(()->{try{queue.put(newObject());}catch(InterruptedExceptione){e.printStackTrace();}});threadA.start();//==3.出列线程threadB=newThread(()->{try{Objectobject=queue.take();}catch(InterruptedExceptione){e.printStackTrace();}});threadB.start();1、当元素A加入队列并被消费进程创建时,会构造一个ArrayBlockingQueue来存储元素;同时会创建一个notEmpty条件。①.producer的productionelement元素A会被存入数组,同时会触发notEmpty条件的signal方法唤醒被阻塞的consumer。②.消费者此时消费该元素,另一个线程消费。根据先进先出原则,元素A将从数组中移除。当数组元素长度为0时,会触发await方法阻塞消费者。2、队列满,将消耗元素N流程如下图(类似逻辑,不分析,偷懒~)二、源码分析1、初始化publicArrayBlockingQueue(intcapacity,booleanfair){if(容量<=0)抛出新的IllegalArgumentException();//初始化数组this.items=newObject[capacity];//fair=false,非公平锁lock=newReentrantLock(fair);//两个条件,不为空,不满足notEmpty=lock.newCondition();notFull=lock.newCondition();}2.Enqueueputpublicvoidput(Ee)throwsInterruptedException{checkNotNull(e);finalReentrantLocklock=this.lock;//##加锁,如果中断,会抛出异常(抛出的异常是doAcquireInterruptibly()和acquireQueued()的主要区别)lock.lockInterruptibly();try{//--1.队列已满,等待阻塞while(count==items.length){notFull.await();}//--2.队列未满,enqueueenqueue(e);}最后{lock.unlock();}}//--2.队列未满,入队逻辑java.util.concurrent.ArrayBlockingQueue#enqueueprivatevoidenqueue(Ex){finalObjectct[]items=this.items;项目[putIndex]=x;如果(++putIndex==items.length)putIndex=0;计数++;//##唤醒notEmpty条件notEmpty.signal();}3.出列takepublicEtake()throwsInterruptedException{finalReentrantLocklock=this.lock;//锁定lock.lockInterruptibly();try{//--1.如果队列为空,则阻塞while(count==0)notEmpty。等待();//--2.队列不为空,dequeuereturndequeue();}最后{lock.unlock();}}//--2.队列不为空,dequeuejava.util.concurrent。ArrayBlockingQueue#dequeueprivateEdequeue(){finalObject[]items=this.items;@SuppressWarnings("unchecked")Ex=(E)items[takeIndex];项目[takeIndex]=null;如果(++takeIndex==items.length)takeIndex=0;数数-;if(itrs!=null)itrs.elementDequeued();//##唤醒notFull条件notFull.signal();返回x;}