无界阻塞队列PriorityBlockingQueue介绍PriorityBlockingQueue是一个有优先级的无界阻塞队列,每次出队时返回优先级最高或最低的元素。在内部,它是使用平衡二叉树堆实现的,因此遍历元素不保证是有序的。默认情况下,使用对象的compareTo方法进行比较。如果需要自定义比较规则,可以自定义比较器。从这个类图中可以看出,PriorityBlockingQueue内部有一个数组queue,用来存放队列元素;size用于存储元素个数;allocationSpinLock是自旋锁,使用CAS操作保证同一时刻只有一个线程用于扩容队列,状态只有0和1,0表示当前没有扩容,1表示容量正在扩大。因为是优先队列,所以有比较器来比较大小。另外还有lock排他锁和notEmpty条件变量来实现take方法的阻塞。由于是无界队列,没有notFull条件变量,所以put是非阻塞的。//二叉树最小堆的实现privatetransientObject[]queue;privatetransientintsize;privatetransientvolatileintallocationSpinLock;privatetransientComparatorcomparator;privatefinalReentrantLocklock;privatefinalConditionnotEmpty;在构造函数中,默认队列容量为11,默认比较器为null,即默认使用元素的compareTo方法判断优先级,因此队列元素必须实现Comparable接口。privatestaticfinalintDEFAULT_INITIAL_CAPACITY=11;publicPriorityBlockingQueue(){this(DEFAULT_INITIAL_CAPACITY,null);}publicPriorityBlockingQueue(intinitialCapacity){this(initialCapacity,null);{if(initialCapacity<1)thrownewIllegalArgumentException();this.lock=newReentrantLock();this.notEmpty=lock.newCondition();this.comparator=比较器;this.queue=newObject[initialCapacity];}offer操作offer操作的作用是向队列中插入一个元素,由于是无界队列,所以总是返回true。publicbooleanoffer(Ee){if(e==null)thrownewNullPointerException();finalReentrantLocklock=this.lock;锁.锁();intn,上限;对象[]数组;//1。if如果当前元素个数>=队列容量,则展开while((n=size)>=(cap=(array=queue).length))tryGrow(array,cap);尝试{//2。默认比较器为nullComparatorcmp=比较器;如果(cmp==null)siftUpComparable(n,e,数组);否则//3。自定义比较器siftUpUsingComparator(n,e,array,cmp);//4。Queue元素个数加1,在notEmpty条件queuesize=n+1中唤醒一个阻塞的线程;notEmpty.signal();}最后{lock.unlock();}returntrue;}上面的代码并不复杂,我们主要看一下如何扩展和构建内部堆。先看扩展逻辑:privatevoidtryGrow(Object[]array,intoldCap){lock.unlock();//必须释放然后重新获取主锁Object[]newArray=null;//1。如果CAS扩容成功if(allocationSpinLock==0&&UNSAFE.compareAndSwapInt(this,allocationSpinLockOffset,0,1)){try{//oldCap<64,则扩容oldCap+2,否则扩容50%,最大值为MAX_ARRAY_SIZEintnewCap=oldCap+((oldCap<64)?(oldCap+2)://如果小则增长更快(oldCap>>1));if(newCap-MAX_ARRAY_SIZE>0){//可能溢出intminCap=oldCap+1;如果(minCap<0||minCap>MAX_ARRAY_SIZE)抛出新的OutOfMemoryError();newCap=MAX_ARRAY_SIZE;}if(newCap>oldCap&&queue==array)newArray=newObject[newCap];}最后{allocationSpinLock=0;}}//2。第一个线程CAS成功后,第二个线程进入这段代码,然后第二个线程让出CPU,尝试让第一个线程获取锁,但是不能保证if(newArray==null)//如果另一个线程正在分配Thread.yield();锁.锁();if(newArray!=null&&queue==array){queue=newArray;System.arraycopy(array,0,newArray,0,oldCap);}}tryGrow的作用是展开,但是为什么要在展开前释放锁,然后用CAS来控制只有一个线程能不能展开成功呢?其实不释放锁也可以,就是扩容期间一直持有锁,只是扩容需要时间。如果在此期间锁被占用,此时其他线程无法进行dequeue和enqueue操作,降低并发性,因此为了提高性能,使用CAS控制只能扩容一个线程,扩容前先释放锁,以便其他线程可以执行入队和出队操作。展开线程展开后,自旋锁变量allocationSpinLock会被重置为0,这里不使用UNSAFE方法的CAS来设置,因为可能只有一个线程同时获取锁,allocationSpinLock修改为不稳定。我们来看看堆构建算法:privatestatic
