当前位置: 首页 > 科技观察

Java阻塞队列实现原理分析

时间:2023-03-12 00:35:06 科技观察

Java中的阻塞队列接口BlockingQueue继承自Queue接口。BlockingQueue接口提供了三种添加元素的方法:add:向队列中添加一个元素,添加成功则返回true,如果由于容量已满而添加失败则抛出IllegalStateException;offer:向队列中添加一个元素,添加成功返回true,添加失败返回false;put:向队列中添加元素,如果容量已满,则阻塞直到容量未满。3删除方法:poll:删除队列的头部元素,如果队列为空,则返回null。否则返回元素;remove:根据对象找到对应的元素并删除。删除成功返回true,否则返回false;take:删除队列头部的元素,如果队列为空,则阻塞,直到队列中有元素再删除。常用的阻塞队列具体类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等,本文以ArrayBlockingQueue和LinkedBlockingQueue为例分析其实现原理。ArrayBlockingQueueArrayBlockingQueue的原理是利用一个可重入锁和这个锁产生的两个条件对象进行并发控制(经典的二条件算法)。ArrayBlockingQueue是一个有长度的阻塞队列。初始化时必须指定队列长度,指定长度后不允许修改。它有如下属性://存放队列元素的数组,是一个循环数组finalObject[]items;//获取数据的索引,用于take、poll、peek、remove方法inttakeIndex;//索引的索引data,useForput,offer,add方法intputIndex;//元素个数intcount;//可重入锁finalReentrantLocklock;//notEmpty条件对象,lock创建的privatefinalConditionnotEmpty;//notFull条件对象,lock创建的privatefinalConditionnotFull;数据添加ArrayBlockingQueue有几种不同的数据添加方法,add、offer、put方法。add方法:publicbooleanadd(Ee){if(offer(e))returntrue;elsethrownewIllegalStateException("Quueefull");}add方法内部调用offer方法如下:publicbooleanoffer(Ee){checkNotNull(e);//元素为不允许为空finalReentrantLocklock=this.lock;lock.lock();//加锁保证调用offer方法时只有一个线程try{if(count==items.length)//如果队列是fullreturnfalse;//直接返回false,添加失败else{insert(e);//如果数组未满,调用insert方法returntrue;//返回true,添加成功}}finally{lock.unlock();//释放锁,让其他线程可以调用offer方法}}insert方法如下:privatevoidinsert(Ex){items[putIndex]=x;//向数组添加元素putIndex=inc(putIndex);//放入数据索引+1,当索引满时变为0++计数;//元素个数+1notEmpty.signal();//使用条件对象notEmpty通知,比如没有时当我们在队列中的数据使用take方法时,它会被阻塞。这时队列插入一条数据,需要调用signal通知}put方法:publicvoidput(Ee)throwsInterruptedException{checkNotNull(e);//元素不允许为空finalReentrantLocklock=this.lock;lock.lockInterruptibly();//加锁,保证调用put方法时只有一个线程try{while(count==items.length)//如果队列已满,阻塞当前线程并将其添加到条件对象notFullnotFull.await();的等待队列中//线程被阻塞挂起,同时释放锁insert(e);//调用insert方法}finally{lock.unlock();//释放锁,以便其他线程调用put方法}}ArrayBlockingQueue添加数据的方法一共有三个方法:add、put、offer,总结如下:add方法内部调用offer方法,如果队列已满则抛出IllegalStateException,否则trueoffer方法返回false如果队列已满,否则返回trueadd方法和offer方法线程不会被阻塞。如果put方法满了,线程会被阻塞,直到有线程消费完队列中的数据后才有可能被唤醒。这三个方法内部使用了可重入锁来保证原子性。数据删除ArrayBlockingQueue有几种不同的数据删除方法,poll、take、remove方法。poll方法:publicEpoll(){finalReentrantLocklock=this.lock;lock.lock();//加锁保证调用poll方法时只有一个线程try{return(count==0)?null:extract();//如果队列中没有元素,则返回null,否则调用extract方法}finally{lock.unlock();//释放锁,以便其他线程可以调用poll方法}}poll方法内部调用了extract方法:privateEextract(){finalObject[]items=this.items;Ex=this.cast(items[takeIndex]);//获取index位置的元素items[takeIndex]=null;//清除takeIndex对应索引上的数据=inc(takeIndex);//取数据索引+1,当索引满时变为0--count;//元素个数-1notFull.signal();//使用条件对象notFull通知,比如使用put方法put数据到达时,队列已满,阻塞。此时消费一条数据,队列未满,需要调用signal通知returnx;//返回元素}take方法:publicEtake()throwsInterruptedException{finalReentrantLocklock=this.lock;lock.lockInterruptibly();//加锁,保证调用take方法时只有一个线程try{while(count==0)//如果队列为空,阻塞当前线程,加入条件对象的等待队列notEmptynotEmpty.await();//线程被阻塞Suspended,同时释放锁returnextract();//调用extract方法}finally{lock.unlock();//释放锁以便其他线程调用take方法}}remove方法:publicbooleanremove(Objecto){if(o==null)returnfalse;finalObject[]items=this.items;finalReentrantLocklock=this.lock;lock.lock();//加锁保证remove方法调用时只有一个线程try{for(inti=takeIndex,k=count;k>0;i=inc(i),k--){//遍历元素if(o.equals(items[i])){//如果两个对象都等于removeAt(i);//调用removeAt方法returntrue;//删除成功,返回true}}returnfalse;//删除成功,返回false}finally{lock.unlock();//释放锁,这样其他线程可以调用remove方法}}removeAt方法:voidremoveAt(inti){finalObject[]items=this.items;if(i==takeIndex){//如果要删除的数据的index是索引位置,直接删除索引位置的数据,然后取索引+1Items[takeIndex]=null;takeIndex=inc(takeIndex);}else{//如果要删除的数据的索引不是索引位置,移动元素元素,更新索引的值并将索引放入(;;){intnexti=inc(i);if(nexti!=putIndex){items[i]=items[nexti];i=nexti;}else{items[i]=null;putIndex=i;break;}}}--count;//元素个数-1notFull.signal();//使用条件对象notFull通知,比如使用put方法放数据时,队列已满,此时消费了一条数据,队列未满,需要调用signal来通知}ArrayBlockingQueue删除数据,有poll、take、remove三种方法。总结如下:如果队列为空,poll方法返回null,否则返回队列头部的元素。remove方法取的元素是基于对象的下标值。如果删除成功则返回true,否则返回false。poll方法和remove方法不会阻塞线程。当队列为空时,take方法将阻塞并挂起当前线程,直到有数据添加到队列中。这三个方法会在内部调用notFull.signal方法来通知正在等待队列满的阻塞线程。LinkedBlockingQueueLinkedBlockingQueue是一个阻塞队列,使用链表完成队列操作。链表是单向链表,而不是双向链表。内部使用lock和lock,这两个锁实现了阻塞(“双锁队列”算法)。它有如下属性://capacitysizeprivatefinalintcapacity;//元素个数,因为有2个锁,存在竞争条件,使用AtomicIntegerprivatefinalAtomicIntegercount=newAtomicInteger(0);//head节点privatetransientNodehead;//endnodeprivatetransientNodelast;//取锁privatefinalReentrantLocktakeLock=newReentrantLock();//取锁条件对象privatefinalConditionnotEmpty=takeLock.newCondition();//释放锁privatefinalReentrantLockputLock=newReentrantLock();//释放锁condition对象privatefinalConditionnotFull=putLock.newCondition();ArrayBlockingQueue只有一把锁,添加数据和删除数据时只能执行一把,不允许并行执行。LinkedBlockingQueue有2把锁,锁和锁,添加数据和删除数据可以并行进行,当然添加数据和删除数据的时候各只能一个线程执行。数据添加LinkedBlockingQueue有几种不同的数据添加方法,add、offer、put方法。add方法内部调用offer方法:publicbooleanoffer(Ee){if(e==null)thrownewNullPointerException();//不允许有null元素finalAtomicIntegercount=this.count;if(count.get()==capacity)//如果capacityFull,返回falsereturnfalse;intc=-1;Nodenode=newNode(e);//capacity未满,构造一个新元素的节点finalReentrantLockputLock=this.putLock;putLock。lock();//加锁加锁,保证offer方法调用时只有一个线程try{if(count.get()=0;//添加成功返回true,否则返回false}put方法:publicvoidput(Ee)throwsInterruptedException{if(e==null)thrownewNullPointerException();//不为空元素allowedintc=-1;Nodenode=newNode(e);//用新元素构造一个节点finalReentrantLockputLock=this.putLock;finalAtomicIntegercount=this.count;putLock.lockInterruptibly();//锁定并锁定到确保调用put方法时只有一个线程node);//将节点添加到链表的末尾c=count.getAndIncrement();//元素个数+1if(c+10){//判断是否有队列中的数据x=dequeue();//删除头节点c=count.getAndDecrement();//元素个数-1if(c>1)//如果队列中有元素notEmpty.signal();//锁中的条件对象在notEmpty上唤醒等待线程,说明队列中还有数据,可以再次消费}}finally{takeLock.unlock();//释放锁,让其他线程可以调用poll方法}if(c==capacity)//因为有锁有锁,有可能锁一直在加数据,计数就会变化。这里的if条件是指如果数据可以插入队列signalNotFull();//唤醒一个等待锁条件对象notFull的线程,表示数据可以插入队列returnx;}take方法:publicEtake()throwsInterruptedException{Ex;intc=-1;finalAtomicIntegercount=this.count;finalReentrantLocktakeLock=this.takeLock;takeLock.lockInterruptibly();//取锁加锁保证调用take方法时只有一个线程try{while(count.get()==0){//如果队列中没有元素notEmpty.await();//阻塞并挂起当前线程}x=dequeue();//删除头部节点c=计数。getAndDecrement();//元素个数-1if(c>1)//如果队列中有元素notEmpty.signal();//在持有锁的条件对象notEmpty上唤醒等待线程,表示队列中还有元素如果有数据,可以再次消费}finally{takeLock.unlock();//释放锁,让其他线程可以调用take方法}if(c==capacity)//由于有释放锁和获取锁的存在,所以这里可以释放锁,因为一直在添加数据,计数会发生变化。这里的if条件表示如果数据可以再次插入队列signalNotFull();//唤醒一个等待锁条件对象notFull的线程,表示数据可以再次插入队列returnx;}remove方法:publicbooleanremove(Objecto){if(o==null)returnfalse;fullyLock();//remove操作要移动的位置不固定,两个锁都需要加锁try{for(Nodetrail=head,p=trail.next;//从head开始遍历pnodeofthelinkedlist!=null;trail=p,p=p.next){if(o.equals(p.item)){//判断是否找到对象unlink(p,trail);//修改节点的链接信息,同时调用notFull的signal方法returntrue;}}returnfalse;}finally{fullyUnlock();//2个锁解锁}}LinkedBlockingQueue的take方法有时会阻塞没有数据,poll方法删除链表的头节点,remove方法删除指定对象。需要注意的是,remove方法需要同时锁上两把锁,因为要删除的数据的位置是不确定的。