当前位置: 首页 > 后端技术 > Java

JavaJUCPriorityBlockingQueue分析

时间:2023-04-01 19:59:47 Java

无界阻塞队列PriorityBlockingQueue介绍PriorityBlockingQueue是一个有优先级的无界阻塞队列,每次出队时返回优先级最高或最低的元素。在内部,它是使用平衡二叉树堆实现的,因此遍历元素不保证是有序的。默认情况下,使用对象的compareTo方法进行比较。如果需要自定义比较规则,可以自定义比较器。从这个类图中可以看出,PriorityBlockingQueue内部有一个数组queue,用来存放队列元素;size用于存储元素个数;allocationSpinLock是自旋锁,使用CAS操作保证同一时刻只有一个线程用于扩容队列,状态只有0和1,0表示当前没有扩容,1表示容量正在扩大。因为是优先队列,所以有比较器来比较大小。另外还有lock排他锁和notEmpty条件变量来实现take方法的阻塞。由于是无界队列,没有notFull条件变量,所以put是非阻塞的。//二叉树最小堆的实现privatetransientObject[]queue;privatetransientintsize;privatetransientvolatileintallocationSpinLock;privatetransientComparatorcomparator;privatefinalReentrantLocklock;privatefinalConditionnotEmpty;在构造函数中,默认队列容量为11,默认比较器为null,即默认使用元素的compareTo方法判断优先级,因此队列元素必须实现Comparable接口。privatestaticfinalintDEFAULT_INITIAL_CAPACITY=11;publicPriorityBlockingQueue(){this(DEFAULT_INITIAL_CAPACITY,null);}publicPriorityBlockingQueue(intinitialCapacity){this(initialCapacity,null);{if(initialCapacity<1)thrownewIllegalArgumentException();this.lock=newReentrantLock();this.notEmpty=lock.newCondition();this.comparator=比较器;this.queue=newObject[initialCapacity];}offer操作offer操作的作用是向队列中插入一个元素,由于是无界队列,所以总是返回true。publicbooleanoffer(Ee){if(e==null)thrownewNullPointerException();finalReentrantLocklock=this.lock;锁.锁();intn,上限;对象[]数组;//1。if如果当前元素个数>=队列容量,则展开while((n=size)>=(cap=(array=queue).length))tryGrow(array,cap);尝试{//2。默认比较器为nullComparatorcmp=比较器;如果(cmp==null)siftUpComparable(n,e,数组);否则//3。自定义比较器siftUpUsingComparator(n,e,array,cmp);//4。Queue元素个数加1,在notEmpty条件queuesize=n+1中唤醒一个阻塞的线程;notEmpty.signal();}最后{lock.unlock();}returntrue;}上面的代码并不复杂,我们主要看一下如何扩展和构建内部堆。先看扩展逻辑:privatevoidtryGrow(Object[]array,intoldCap){lock.unlock();//必须释放然后重新获取主锁Object[]newArray=null;//1。如果CAS扩容成功if(allocationSpinLock==0&&UNSAFE.compareAndSwapInt(this,allocationSpinLockOffset,0,1)){try{//oldCap<64,则扩容oldCap+2,否则扩容50%,最大值为MAX_ARRAY_SIZEintnewCap=oldCap+((oldCap<64)?(oldCap+2)://如果小则增长更快(oldCap>>1));if(newCap-MAX_ARRAY_SIZE>0){//可能溢出intminCap=oldCap+1;如果(minCap<0||minCap>MAX_ARRAY_SIZE)抛出新的OutOfMemoryError();newCap=MAX_ARRAY_SIZE;}if(newCap>oldCap&&queue==array)newArray=newObject[newCap];}最后{allocationSpinLock=0;}}//2。第一个线程CAS成功后,第二个线程进入这段代码,然后第二个线程让出CPU,尝试让第一个线程获取锁,但是不能保证if(newArray==null)//如果另一个线程正在分配Thread.yield();锁.锁();if(newArray!=null&&queue==array){queue=newArray;System.arraycopy(array,0,newArray,0,oldCap);}}tryGrow的作用是展开,但是为什么要在展开前释放锁,然后用CAS来控制只有一个线程能不能展开成功呢?其实不释放锁也可以,就是扩容期间一直持有锁,只是扩容需要时间。如果在此期间锁被占用,此时其他线程无法进行dequeue和enqueue操作,降低并发性,因此为了提高性能,使用CAS控制只能扩容一个线程,扩容前先释放锁,以便其他线程可以执行入队和出队操作。展开线程展开后,自旋锁变量allocationSpinLock会被重置为0,这里不使用UNSAFE方法的CAS来设置,因为可能只有一个线程同时获取锁,allocationSpinLock修改为不稳定。我们来看看堆构建算法:privatestaticvoidsiftUpComparable(intk,Tx,Object[]array){Comparablekey=(Comparable)x;//队列元素如果个数>0,则判断插入位置,否则直接入队while(k>0){intparent=(k-1)>>>1;对象e=array[parent];如果(key.compareTo((T)e)>=0)中断;数组[k]=e;k=父母;}array[k]=key;}如果熟悉二叉堆,代码并不复杂。parent=(k-1)>>>1,首先k-1是获取当前实数下标的位置,然后>>>1获取父节点的位置,我们在这个图中知道,k=7,执行(k-1)Parent=3得到>>>1后,根据下标我们知道是元素6。PriorityQueue是一棵完全二叉树,不允许有空节点,其父节点小于叶子节点。这是堆排序中的最小堆。二叉树在数组中的存储方式很简单,就是从上到下,从左到右。完整的二叉树可以一一对应数组中的位置:左叶节点=父节点下标*2+1右叶节点=父节点下标*2+2父节点=(叶节点-1)/2是其实就是比较要插入的元素x和它的父节点元素6,如果比父节点大,就一直向上移动。poll操作poll操作的作用是获取队列内部堆树的根节点元素,如果队列为空则返回null。publicEpoll(){finalReentrantLocklock=this.lock;//获取独占锁lock.lock();尝试{返回出队();}finally{//释放独占锁lock.unlock();}}我们主要看一下dequeue方法。privateEdequeue(){intn=size-1;//队列为空,返回nullif(n<0)returnnull;else{对象[]数组=队列;//1。获取头元素Eresult=(E)array[0];//2。获取尾元素并赋值nullEx=(E)array[n];数组[n]=空;比较器cmp=比较器;如果(cmp==null)//3。siftDownComparable(0,x,array,n);否则siftDownUsingComparator(0,x,array,n,cmp);大小=n;//4。返回结果;}}该方法如果队列为空,则直接返回null,否则执行代码(1)获取数组的第一个元素作为返回值,存入变量Result中。这里需要注意的是,数组中的第一个元素是优先级最低或最高的元素。dequeue操作就是返回这个元素。然后代码(2)获取队列的尾元素存入变量x中,并清空尾节点,然后执行代码(3)将变量x插入到数组下标为0的位置,然后将堆重新调整为最大或最小的堆,然后返回。这里重要的是在移除堆的根节点后如何使用剩余节点重新调整最大或最小堆。让我们看一下siftDownComparable的实现。privatestaticvoidsiftDownComparable(intk,Tx,Object[]array,intn){if(n>0){Comparablekey=(Comparable)x;inthalf=n>>>1;//在非叶节点上循环while(k)c).compareTo((T)array[right])>0)c=array[child=right];如果(key.compareTo((T)c)<=0)中断;数组[k]=c;k=孩子;}数组[k]=键;}}由于队列数组的第0个元素是根,因此需要将其移除。这时候数组已经不是最小堆了,需要调整堆。具体来说就是从被移除的树根的左右子树中找一个最小值作为根,左右子树都会在自己的左右子树中找最小值。这是一个递归过程,直到叶节点结束递归。假设当前队列内容如下图所示:leftChildVal=4;上图中树根处的rightChildVal=6;由于4<6,c=4。那么由于11>4,即key>c,用元素4覆盖树的根节点的值。然后leftChildVal=8;树根的左子树的左右子节点中rightChildVal=10;由于8<10,c=8。那么,由于11>8,即key>c,元素8作为树根的左子树的根节点。现在树的形状如下图第三步所示。此时判断是否k