当前位置: 首页 > 后端技术 > Java

JavaJUCLinkedBlockingQueue分析

时间:2023-04-01 13:45:11 Java

阻塞队列LinkedBlockingQueue介绍上一篇介绍了使用CAS算法实现的非阻塞队列ConcurrentLinkedQueue。本文介绍使用排它锁实现的阻塞队列LinkedBlockingQueue。这个类图可以看出LinkedBlockingQueue也是使用单向链表实现的,其中包含headNode,lastNode,用于存放头尾节点;还有一个初始值为0的原子变量count,用来记录队列元素的个数;此外,它还包含两个ReentrantLock实例,分别用于控制元素进出队列的原子性,其中takeLock用于控制同一时刻只能有一个线程从队头获取元素,而putLock是同时只控制一个线程。从队列末尾添加元素。notEmpty和notFull是条件变量,它们内部都有一个条件队列,用于存放进出队列时阻塞的线程。LinkedBlockingQueue的构造函数如下:publicLinkedBlockingQueue(){this(Integer.MAX_VALUE);}publicLinkedBlockingQueue(intcapacity){if(capacity<=0)thrownewIllegalArgumentException();this.capacity=容量;last=head=newNode(null);}可以看到默认情况下LinkedBlockingQueue队列容量是int的最大值。当然,我们也可以直接指定容量大小,这样就可以在一定程度上说明LinkedBlockingQueue是一个有界阻塞队列。offer操作将一个元素插入到队列的末尾。如果队列空闲,则插入成功并返回true。如果队列已满,则丢弃当前元素并返回false。如果插入的元素为空,则抛出异常。并且该方法是非阻塞的。publicbooleanoffer(Ee){//(1)如果元素为空则抛出异常if(e==null)thrownewNullPointerException();//(2)如果队列已满则丢弃该元素finalAtomicIntegercount=this.count;如果(count.get()==容量)返回false;intc=-1;//(3)新建节点,然后获取putLock独占锁Nodenode=newNode(e);finalReentrantLockputLock=this.putLock;putLock.lock();try{//(4)如果队列未满,则进入队列并增加计数if(count.get()=0;}代码(1)首先判断入队元素是否为空,为空则抛出异常。代码(2)判断队列是否满,丢弃,满则返回false。代码(3)新建一个Node节点,然后获取putLock锁。获取到锁后,其他调用put或offer方法的线程会被阻塞,放入putLock的AQS阻塞队列中。代码(4)判断队列是否满了,为什么要重新判断呢?因为在执行代码(2)和获取锁之间可能有其他线程通过put或者offer加入新元素,所以再次判断队列是否满,如果满了,就会有新元素入队,增加计数器。代码(5)判断如果新元素入队后还有剩余空间,则唤醒一个阻塞在notFull条件队列中的线程(唤醒调用notFull的await操作的线程,比如当put方法被执行并且队列已满)。代码(6)释放获取的putLock锁。这里需要注意的是,锁的释放一定要在finally中完成,因为即使try块抛出异常,finally也会被执行。另外,在释放锁后,调用put操作阻塞的其他线程之一将获得锁。代码(7)c==0表示执行代码(6)释放锁时队列中至少有一个元素,如果队列中有元素则执行signalNotEmpty方法。privatevoidsignalNotEmpty(){finalReentrantLocktakeLock=this.takeLock;takeLock.lock();尝试{notEmpty.signal();}最后{takeLock.unlock();}}该方法的作用是激活notEmpty的条件队列一个因为调用notEmpty的await方法而被阻塞的线程(比如调用take方法时队列为空),这也说明了对应的必须在调用条件变量的方法之前获取锁。总结:offer方法使用了putLock锁来保证添加的原子性。另外需要注意的是,在调用条件变量时,必须先获取对应的锁,enqueue只对链表的尾节点进行操作。put操作此方法将一个元素插入到队列的末尾。如果队列空闲,则插入成功,直接返回。如果队列已满,则当前线程将被阻塞,直到队列空闲且插入成功返回。如果在阻塞时中断标志被另一个线程设置,被阻塞的线程将抛出异常并返回。如果传递的元素为空,则抛出异常。publicvoidput(Ee)throwsInterruptedException{//(1)如果插入的元素为空则抛出异常if(e==null)thrownewNullPointerException();intc=-1;//(2)建立一个新节点,并获得独占锁putLockNodenode=newNode(e);finalReentrantLockputLock=this.putLock;最终AtomicInteger计数=this.count;putLock.lockInterruptibly();try{//(3)if队列满时,waitwhile(count.get()==capacity){notFull.await();}//(4)插入队列并增加计数enqueue(node);c=count.getAndIncrement();//(5)if(c+10){x=dequeue();c=count.getAndDecrement();//(4)如果(c>1)notEmpty.signal();}}最后{//(5)takeLock.unlock();}//(6)if(c==capacity)signalNotFull();//(7)returnx;}privateEdequeue(){Nodeh=head;Nodefirst=h.next;h.next=h;//帮助GChead=first;Ex=first.item;first.item=null;returnx;}首先,代码(1)很简单,先判断当前队列是否为空,为空则直接返回null。代码(2)获取排他锁takeLock。当前线程获得锁后,其他线程在调用poll或take方法时会被阻塞。代码(3)判断如果当前队列不为空,则执行出队操作,然后递减计数器。虽然代码(3)中判断队列是否为空和获取队列元素不是原子的,count值只会在poll、take或remove操作的地方递减,但这三个方法需要获取takeLock锁。在当前线程已经获取到takeLock锁的情况下进行操作,所以其他线程在当前情况下没有机会递减计数计数器的值,所以看起来不是原子的,但它们是线程安全的。代码(4)判断,如果c>1,说明当前线程在队列中移除一个元素后队列不为空(c为删除元素前队列元素的个数),那么此时,可以因为调用了take方法而被激活。阻塞notEmpty条件队列中的线程。代码(6)表明在当前线程移除队头元素前当前队列已满,移除队头元素后当前队列中至少有一个空闲位置,此时可以调用signalNotFull激活条件队列中的一个线程由于调用put方法而阻塞到notFull,signalNotFull的代码如下。privatevoidsignalNotFull(){finalReentrantLockputLock=this.putLock;putLock.lock();尝试{notFull.signal();}最后{putLock.unlock();删除它,如果队列为空则返回null。该方法是一种非阻塞方法。publicEpeek(){//(1)if(count.get()==0)returnnull;//(2)finalReentrantLocktakeLock=this.takeLock;takeLock.lock();尝试{Nodefirst=head.next;//(3)if(first==null)returnnull;else//(4)返回first.item;}最后{takeLock.unlock();}}peek方法不是很复杂,需要注意这里需要判断first==null,不能直接返回first.item,因为代码(1)和代码(2)不是原子的,很有可能代码(1)判断队列不为空后获取如果其他线程在锁前进行poll或take操作,则队列为空,然后直接返回fist.item,会出现空指针异常。takeoperation该方法获取当前队列头元素并将其从队列中移除。如果队列为空,则阻塞当前线程,直到队列不为空,然后获取并返回元素。如果中断标志在阻塞时被其他线程置位,那么被阻塞的线程就会抛出异常。publicEtake()throwsInterruptedException{Ex;intc=-1;最终AtomicInteger计数=this.count;最后的ReentrantLocktakeLock=this.takeLock;//(1)获取锁takeLock.lockInterruptibly();try{//(2)如果队列为空,则阻塞挂起while(count.get()==0){notEmpty.await();}//(3)出队并倒计时x=dequeue();c=count.getAndDecrement();//(4)如果(c>1)notEmpty.signal();}finally{//(5)takeLock.unlock();}//(6)if(c==capacity)signalNotFull();returnx;}代码(1)中,当前线程获取排他锁,其他调用take或poll操作的线程将被阻塞挂起。代码(2)判断如果队列为空,则阻塞挂起当前线程,将当前线程放入notEmpty条件队列。代码(3)出队倒计时。代码(4)判断如果c>1,说明当前队列不为空,则通过调用take操作唤醒阻塞的notEmpty条件队列中的一个线程。代码(5)释放锁。代码(6)判断如果c==capacity,说明当前队列至少有一个空闲位置,则通过调用put操作激活notFull条件队列中被阻塞的线程。remove操作删除队列中的指定元素,有则返回true,无元素则返回false。publicbooleanremove(Objecto){if(o==null)返回false;//(1)获取putLock和takeLockfullyLock();try{//(2)遍历寻找要删除的元素for(Nodetrail=head,p=trail.next;p!=null;trail=p,p=p.next){//(3)if(o.equals(p.item)){unlink(p,trail);返回真;}}//(4)返回假;}finally{//(5)fullyUnlock();}}voidfullyLock(){putLock.lock();takeLock.lock();}第一段代码(1)获得双锁后,会暂停其他线程的入队和出队操作。代码(2)遍历队列寻找要删除的元素,没有找到就返回false,找到就执行unlink方法,我们来看看这个方法。voidunlink(Nodep,Nodetrail){p.item=null;trail.next=p.next;如果(last==p)last=trail;//如果当前队列已满,则删除最后,唤醒等待线程if(count.getAndDecrement()==capacity)notFull.signal();}trail是删除元素的前驱节点。删除元素后,如果当前队列有空闲空间,条件是唤醒notFull队列中调用put方法阻塞的线程。代码(5)调用fullyUnlock解除双锁。voidfullyUnlock(){takeLock.unlock();putLock.unlock();}总结:由于remove方法在删除指定元素之前加了两把锁,所以在遍历队列寻找指定元素的过程中是线程安全的,而此时调用的所有其他线程入队和出队操作将被阻止。请注意,获取多个资源锁的顺序与释放它们的顺序相反。大小操作获取队列元素的当前数量。publicintsize(){returncount.get();}由于dequeue和enqueue操作的计数被锁定,所以结果比ConcurrentLinkedQueue的size方法更准确。综上所述,LinkedBlockingQueue内部是通过单向链表实现的,使用头节点和尾节点进行入队和出队操作,即入队操作在尾节点,出队操作在头节点。头尾节点的操作使用单独的独占锁来保证原子性,因此出队和入队操作可以同时进行。此外,头尾节点的独占锁配备条件队列,用于存放阻塞的线程,并结合入队和出队操作实现生产和消费模型。