前言ArrayBlockingQueue是一个数组支持的有界阻塞队列。队列是基于数组实现的,容量在创建ArrayBlockingQueue对象的时候已经定义好了。该队列以先进先出(FIFO)的方式对元素进行排序。支持公平锁和非公平锁,默认使用非公平锁。其数据结构如下:注:每个线程在获取锁时可能会排队等待。如果先获得锁的线程和请求在等待时间上必须先得到满足,那么这个锁就是公平的。相反,这个锁是不公平的。公平获取锁,即等待时间最长的线程先获取锁队列创建BlockingQueueblockingQueue=newArrayBlockingQueue<>(5);线程池有很多应用,生产者消费者场景。先进先出队列(队头是最先进的元素;队尾是队列中的最后一个元素)有界队列(即初始化时指定的容量是队列的最大容量队列,不会扩容,容量满了,如果容量为空,则出队操作会阻塞)队列不支持空元素。公平性可以在构造函数中指定。此类支持用于排序等待生产者和消费者线程的可选公平策略。默认情况下,不保证此顺序。但是,通过在构造函数中将fairness设置为true构造的队列允许以FIFO顺序访问线程。公平性通常会降低吞吐量,但也会减少可变性并避免“不平衡”。工作原理ArrayBlockingQueue是BlockingQueue的数组实现。它使用全局锁并行读写队列,使用两个Condition来阻塞容量为空时的fetch操作和容量满时的write操作。基于ReentrantLock保证线程安全,根据Condition实现队列满时阻塞。finalReentrantLocklock;privatefinalConditionnotEmpty;privatefinalConditionnotFull;Lock的作用是提供排他锁机制,保护竞争资源;而Condition是对锁的控制更精细,它依赖于Lock,通过一定的条件来控制多个线程。notEmpty表示“锁的非空条件”。当一个线程想从队列中取数据,但是此时没有数据,线程通过notEmpty.await()等待;当其他线程向队列中插入元素时,调用notEmpty.signal()唤醒“之前通过notEmpty.await()进入等待状态的线程”。同样,notFull表示“锁定完整条件”。当线程要向队列中插入一个元素而队列已满时,线程等待;当其他线程从队列中取出元素时,等待线程被唤醒。试图将元素放入满队列会导致put操作被阻塞,直到BlockingQueue中有新的调用空间才被唤醒继续操作;试图从空队列中取回元素会造成类似的阻塞,直到BlocingkQueue进入新的物品才会被唤醒。源码分析下面的源码分析定义了基于JDK1.8的ArrayBlockingQueue的类继承关系如下:其包含的方法定义如下:成员属性/**实际存储数据的数组*/finalObject[]items;/**take,poll,peekorremove*/inttakeIndex的下一个索引;/**put,offer,oradd下一个索引*/intputIndex;/**队列中的元素个数*/intcount;/**可重入锁*/finalReentrantLocklock;/**如果数组为空,等待Condition*/privatefinalConditionnotEmpty;/**如果数组满,等待Condition*/privatefinalConditionnotFull;/**遍历器实现*/transientItrsitrs=null;constructor/***构造函数,设置队列的初始容量*/publicArrayBlockingQueue(intcapacity){this(capacity,false);}/***构造函数,*容量和指定的访问策略。**@paramcapacity设置数组大小*@paramfair设置是否是公平锁*@throwsIllegalArgumentExceptionif{@codecapacity<1}*/publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity<=0)thrownewIllegalArgumentException();this.items=newObject[capacity];//是否是公平锁,如果是,则先来线程先获取锁对象//否则操作系统调度哪个线程获得锁,一般为false,性能会更高lock=newReentrantLock(fair);notEmpty=lock.newCondition();notFull=lock.newCondition();}/***构造函数,带有初始内容的队列*/publicArrayBlockingQueue(intcapacity,booleanfair,Collectionc){this(capacity,fair);finalReentrantLocklock=this.lock;//加锁的目的是让其他CPU立即看到修改//加锁和解锁的底层是CAS,它将修改强制写回主存,对其他CPU可见lock.lock();//设置数组内容,先加锁try{inti=0;try{for(Ee:c){checkNotNull(e);items[i++]=e;//依次复制内容}}catch(ArrayIndexOutOfBoundsExceptionex){thrownewIllegalArgumentException();}count=i;putIndex=(i==capacity)?0:i;//如果putIndex是大于数组大小,则重写从0开始}finally{lock.unlock();//finally必须释放锁}}入队方法add/offer/put,这三个方法都是往队列中添加元素,说明如下:add方法依赖offer方法,如果队列满则抛异常,否则添加成功返回true;offer方法有两个重载版本,只有一个参数的版本,如果队列已满则返回false,否则加入队列并返回true,add方法是调用这个版本的offer方法;另一个版本有一个时间参数,wait如果队列满了,可以指定等待时间,如果在此期间被打断,会抛出异常,如果等待时间结束,则返回false,否则被加入队列返回true;put方法和offer方法逻辑一样,带时间参数,但是没有等待时间限制,会等到队列有空位,再插入队列,返回true/***添加一个元素,其实在super.add*/publicbooleanadd(Ee)中调用了offer方法{returnsuper.add(e);}/***添加成功返回true,否则返回false*/publicbooleanoffer(Ee){//要插入的元素是否为null,如果是则抛出NullPointerExceptioncheckNotNull(e);//获取“阻塞队列的独占锁”finalReentrantLocklock=this.lock;lock.lock();//加锁尝试{//如果队列已满,则返回falseif(count==items.length)//超出数组的容量returnfalse;else{//如果队列未满,则插入e,返回true。enqueue(e);returntrue;}}finally{//释放锁lock.unlock();}}/***如果队列已满则等待*/publicvoidput(Ee)throwsInterruptedException{checkNotNull(e);finalReentrantLocklock=this.lock;lock.lockInterruptibly();//和lock方法的区别在于阻塞时可以抛出异常跳出try{while(count==items.length)notFull.await();//这里是Blocked,注意:如果运行到这里,会释放上面的锁,一直等到notifyenqueue(e);}finally{lock.unlock();}}/***Insert方法有超时事件,unit表示哪个是秒、分钟和小时*/publicbooleanoffer(Ee,longtimeout,TimeUnitunit)throwsInterruptedException{checkNotNull(e);longnanos=unit.toNanos(timeout);finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{while(count==items.length){if(nanos<=0)返回false;nanos=notFull.awaitNanos(nanos);//有超时等待的阻塞方法}enqueue(e);//入队returntrue;}finally{lock.unlock();}}dequeue方法poll/take/peek,这几个方法都是获取队首元素,具体说明如下:poll方法有两个重载版本,第一个版本,如果队列为空,则返回null,否则移除并返回队列的头元素;另一个版本有一个时间参数,如果栈为空就wait,可以指定等待时间,如果等待超时则返回null,如果被中断则抛出异常,否则移除并返回栈顶元素堆。take方法和poll方法一样,都是带时间参数的,只是不能指定等待时间。它会一直等到队列中有元素,然后移除并返回栈顶元素peek方法只是返回队列头部的元素元素,未移除//实现方法,如果当前队列为空,返回nullpublicEpoll(){finalReentrantLocklock=this.lock;lock.lock();try{return(count==0)?null:dequeue();}finally{lock.unlock();}}//实现的方法,如果当前队列为空,则一直阻塞publicEtake()throwsInterruptedException{finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{while(count==0)notEmpty.await();//队列为空,阻塞方法returndequeue();}finally{lock.unlock();}}//有超时事件的取元素方法,否则返回nullpublicEpoll(longtimeout,TimeUnitunit)throwsInterruptedException{longnanos=unit.toNanos(timeout);finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{while(count==0){if(nanos<=0)returnnull;nanos=notEmpty.awaitNanos(nanos);//超时等待}returndequeue();//获取元素}finally{lock.unlock();}}//只看一个队列的前端元素,取出队列中的原始元素不好,队列为空时返回nullpublicEpeek(){finalReentrantLocklock=this.lock;lock.lock();try{returnitemAt(takeIndex);//队列为空时返回null}finally{lock.unlock();}}deleteelement方法remove/clear/drainT,这三个方法用于从队列中移除元素,具体说明如下:remove方法用于移除一个元素,如果栈为空或者没有找到该元素,则将返回false,否则从堆栈中移除该元素;移除时,如果元素在栈顶,则直接移除,如果位于栈的中间,需要将该元素后面的其他元素移到前面。移除后,需要唤醒因为栈满而阻塞的线程。clear方法用于整个堆栈。同时设置takeIndex为putIndex,保证栈中的元素是先进的。先出;至多count个线程最后会被唤醒,因为正常情况下一个线程插入一个元素,如果超过count个线程被唤醒,可能会有一些线程因为栈满而再次阻塞。drainTo方法有两个重载版本,一个是不带number,移除所有元素,复制到指定集合;一个是number,指定数量的元素被移除并复制到指定的集合中,两者底层实现是同一个方法移除后,需要重新设置takeIndex和count,唤醒被移除最多的线程,因为堆栈已满。/***从队列中删除元素的方法。删除成功返回true,否则返回false*/publicbooleanremove(Objecto){if(o==null)returnfalse;finalObject[]items=this.items;finalReentrantLocklock=this.lock;lock.lock();try{if(count>0){finalintputIndex=this.putIndex;inti=takeIndex;//从takeIndex开始遍历直到等于putIndexdo{if(o.equals(items[i])){removeAt(i);//真的deletemethodreturntrue;}//走到数组末尾从头开始,放的时候也遵循这个规则if(++i==items.length)i=0;}while(i!=putIndex);//循环不断的取出来做判断}//如果数组为空则返回falsereturnfalse;}finally{lock.unlock();}}/***指定删除索引上的元素.*/voidremoveAt(finalintremoveIndex){//assertlock.getHoldCount()==1;//assertitems[removeIndex]!=null;//assertremoveIndex>=0&&removeIndex0){finalintputIndex=this.putIndex;inti=takeIndex;//从takeIndex开始遍历直到i等于putIndex,将数组元素置为nulldo{items[i]=null;if(++i==items.length)i=0;}while(i!=putIndex);//注意这两个索引这里并没有设置为0,只是让它们相等,因为只要相等,栈就可以先入firstouttakeIndex=putIndex;count=0;if(itrs!=null)itrs.queueIsEmpty();//如果有线程因为栈满而等待,将其唤醒//注意这里没有使用signalAll而是通过一个for循环多次发出信号,纯粹从唤醒线程的角度来说可以使用signalAll,效果和这里的for循环一样//如果有等待线程,表示count为当前线程的最大容量,这里清零,最多只放count次,一个线程只能放1次time,只唤醒计数线程,避免//线程因为栈满而被唤醒再次阻塞for(;k>0&&lock.hasWaiters(notFull);k--)notFull.signal();}}finally{lock.unlock();}}/***移除集合中的所有元素*/publicintdrainTo(Collectionc){returndrainTo(c,Integer.MAX_VALUE);}/***RemoveAllelementstothecollection*/publicintdrainTo(Collectionc,intmaxElements){//验证参数是否合法checkNotNull(c);if(c==this)thrownewIllegalArgumentException();if(maxElements<=0)return0;finalObject[]items=this.items;finalReentrantLocklock=this.lock;lock.lock();try{//取两者之间的最小值intn=Math.min(maxElements,count);inttake=takeIndex;inti=0;try{//从takeIndex开始遍历,取出元素加入c,直到满足数量要求while(i0){//取后修改count减去icount-=i;takeIndex=take;if(itrs!=null){if(count==0)//通知itrs栈为空itrs.queueIsEmpty();elseif(i>take)//表示take中间有变成0,notifyitrsitrs.takeIndexWrapped();}//唤醒因为栈满而等待的线程,最多唤醒i,同上避免线程因为栈满而被唤醒阻塞for(;i>0&&lock.hasWaiters(notFull);i--)notFull.signal();}}}finally{lock.unlock();}}iterator/Itr/ItrsItr和Itrs是ArrayBlockingQueue的两个内部类,如下:iterator方法返回一个iterator实例,用于实现for循环遍历和Collection接口的一部分.实现如下:publicIteratoriterator(){returnnewItr();}Itr(){//assertlock.getHoldCount()==0;lastRet=NONE;finalReentrantLocklock=ArrayBlockingQueue.this.lock;lock.lock();try{if(count==0){//NONE和DETACHED都是常量cursor=NONE;nextIndex=NONE;prevTakeIndex=DETACHED;}else{//初始化各个属性finalinttakeIndex=ArrayBlockingQueue.this.takeIndex;prevTakeIndex=takeIndex;nextItem=itemAt(nextIndex=takeIndex);cursor=incCursor(takeIndex);if(itrs==null){itrs=newItrs(this);}else{//初始化Itrs,将当前线程注册到Itrsitrs.register(this);//inthisorderitrs.doSomeSweeping(false);}prevCycles=itrs.cycles;//asserttakeIndex>=0;//assertprevTakeIndex==takeIndex;//assertnextIndex>=0;//assertnextItem!=null;}}finally{lock.unlock();}}Itrs(Itrinitial){register(initial);}//计算cursorprivateintincCursor(intindex){//assertlock.getHoldCount()==1;if(++index==items.length)index=0;if(index==putIndex)index=NONE;returnindex;}/***创建新的Itr实例时,会调用该方法将实例添加到Node列表中*/voidregister(Itritr){//新建一个节点,插入到头节点前面head=newNode(itr,head);}总结ArrayBlockingQueue是一个阻塞队列,内部通过ReentrantLock实现线程安全,并为Condition实现await和signal的功能waitingforwakeup它的数据结构是一个数组,准确的说是一个圆形数组(可以比作一个圆环),当达到最大长度时,所有下标自动从0开始继续。PS:以上代码提交在Github上:https://github.com/Niuh-Study/niuh-juc-final.git