前言本节以ArrayBlockingQueue为例,为大家介绍阻塞队列是如何实现的。让我们来看看!ArrayBlockingQueue源码分析构造函数也是一样的,我们先从它的构造函数说起。publicArrayBlockingQueue(intcapacity){this(capacity,false);}capacity固定容量大小。false,这个字段的名字其实是公平的。默认为false,非公平锁。在上一节中,我们使用了它的默认用法。我们之前讲过公平锁和非公平锁。可以查看之前的文章(ReentrantLock源码解析)。接下来我们看一下:publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity<=0)thrownewIllegalArgumentException();this.items=newObject[容量];锁=新的ReentrantLock(公平);notEmpty=锁定。新条件();notFull=lock.newCondition();}从上面的代码可以看出capacity>0,第一个构造函数的this()其实就是调整后的构造函数,我们可以用它来指定capacity和ArrayBlockingQueue的访问策略(公平和不公平)。然后看最后一个构造函数。publicArrayBlockingQueue(intcapacity,booleanfair,Collectionc){this(capacity,fair);finalReentrantLocklock=this.lock;//锁定可见性lock.lock();尝试{inti=0;try{//迭代集合for(Ee:c){checkNotNull(e);项目[i++]=e;}//越界捕获异常}catch(ArrayIndexOutOfBoundsExceptionex){thrownewIllegalArgumentException();}数=我;putIndex=(i==capacity)?0:我;}最后{lock.unlock();}}从代码来看,相比上一个,Collections多了一些。这个是来做什么的?它允许我们在创建集合时对其进行初始化,并按迭代顺序将其添加到容器中,我们从其内部也可以看出这一点。内部变量//队列的元素finalObject[]items;//获取下一个元素时的索引inttakeIndex;//添加下一个元素时的索引intputIndex;//队列中元素的个数intcount;//全局锁finalReentrantLocklock;//等待条件privatefinalConditionnotEmpty;私人最终条件不完整;//迭代器的共享状态transientItrsitrs=null;内部方法add()&offer()我们来看add方法,这个方法用来向队列中添加一个元素。publicbooleanadd(Ee){returnsuper.add(e);}内部调用父类的方法,继承了AbstractQueue。publicclassArrayBlockingQueueextendsAbstractQueueimplementsBlockingQueue,java.io.Serializable{....}再看AbstractQueue的add()。publicbooleanadd(Ee){if(offer(e))returntrue;elsethrownewIllegalStateException("Queuefull");}可以看到内部调用了offer(),如果添加成功则返回true,如果添加失败则抛出异常,这是在符合我们在上一节中使用它时的特性。但是我们发现里面并没有offer方法,所以实现不在AbstractQueue中,但是实现还是在ArrayBlockingQueue中。我们来看看ArrayBlockingQueue的offer()方法。publicbooleanoffer(Ee){//判断元素e是否为空,抛出NullPointerException异常checkNotNull(e);finalReentrantLocklock=this.lock;//需要持有锁lock.lock();try{//如果元素已满,则返回false,add会抛出异常if(count==items.length)returnfalse;else{//加入队列enqueue(e);返回真;}}最后{lock.unlock();}}查看enqueue:privatevoidenqueue(Ex){finalObject[]items=this.items;//添加元素到期望的索引位置items[putIndex]=x;//如果下一个index的值等于容器个数,则将putIndex返回0if(++putIndex==items.length)putIndex=0;//容器元素个数+1count++;//唤醒等待线程notEmpty.signal();}remove&pollfirst看第一个remove(),AbstractQueue内部也存在同样的方法,如果移除的元素为null则抛出异常。publicEremove(){Ex=poll();如果(x!=null)返回x;elsethrownewNoSuchElementException();}poll()的实现在ArrayBlockingQueue中,内部实现和add很相似。publicEpoll(){finalReentrantLocklock=this.lock;锁.锁();尝试{返回(计数==0)?空:出队();}最后{lock.unlock();}}privateEdequeue(){finalObject[]items=this.items;//这个注解是用来忽略一些警告的,不是重点@SuppressWarnings("unchecked")//按照FIFO取出元素takeIndexEx=(E)items[takeIndex];//取出元素时,设置为空items[takeIndex]=null;//判断下一个元素的位置if(++takeIndex==items.length)takeIndex=0;数数-;if(itrs!=null)itrs.elementDequeued();//唤醒等待线程notFull.signal();//返回元素returnx;}第二个remove(e)是在ArrayBlockingQueue内部实现的,可以移除指定的元素。publicbooleanremove(Objecto){if(o==null)返回false;最终对象[]items=this.items;finalReentrantLocklock=this.lock;锁.锁();try{if(count>0){finalintputIndex=this.putIndex;inti=takeIndex;do{//遍历移除指定元素if(o.equals(items[i])){//移除指定元素并更新对应索引位置removeAt(i);返回真;}//防止越界if(++i==items.length)i=0;}while(i!=putIndex);}返回假;}最后{lock.unlock();Taketake会造成线程阻塞下面看一下它的内部实现。publicEtake()throwsInterruptedException{finalReentrantLocklock=this.lock;//获取锁,当线程被中断时,会直接返回lock.lockInterruptibly();try{//如果元素为空,则进入阻塞,即没有元素就绪,进入waitingwhile(count==0)//让当前线程等待notEmpty.await();//否则出队returndequeue();}最后{lock.unlock();}}put这个方法的实现和take类似,也是阻塞线程。publicvoidput(Ee)throwsInterruptedException{checkNotNull(e);finalReentrantLocklock=this.lock;锁.lockInterruptibly();try{//如果元素已满则阻塞while(count==items.length)notFull.等待();//否则入队(e);}最后{lock.unlock();}}结论本节主要讲述ArrayBlockingQueue的源码实现。它的源代码比较简单。大家可以按照本节的说明看一下BlockingQueue的其他实现类。