当前位置: 首页 > 科技观察

并发编程的CyclicBarrier原理和使用

时间:2023-03-16 20:33:44 科技观察

序控制并发进程的工具,就是为了帮助我们的程序员让线程之间的协作变得更简单,让线程之间相互协作来满足业务逻辑。比如让线程A等待线程B执行完,再执行其他的协作策略。控制并发进程的工具类主要有:简介从字面上看,这个类的中文意思是“圈套”。这可能意味着可回收的屏障。它的作用是让所有线程等待完成后再进行下一步。比如在生活中,我们邀请朋友到某家餐厅吃饭。有的朋友可能早到,有的朋友可能晚到,但是餐厅规定必须等到大家都过期了才让我们进去。这里的朋友在各种线程,餐厅是CyclicBarrier。JUC包为我们提供了一个同步工具类,可以很好的模拟这样的场景,它就是CyclicBarrier类。使用CyclicBarrier类,一组线程可以相互等待,当所有线程都到达某个屏障点时,执行后续操作。下图演示了这个过程。该应用场景可用于多线程统计数据,最后合并统计结果。使用CyclicBarrier实现等待的线程称为参与者。参与者只需执行cyclicBarrier.await()即可实现等待。由于CyclicBarrier在内部维护一个显式锁,因此可以知道哪个参与者是最后执行cyclicBarrier.await()的。当最后一个线程执行完后,其他使用对应CyclicBarrier实例的参与者会被唤醒,最后一个线程本身不会被挂起。流程图如下:publicstaticvoidmain(String[]args){CyclicBarriercyclicBarrier=newCyclicBarrier(7,()->{System.out.println("****召唤龙");});for(inti=1;i<=7;i++){intfinalI=i;newThread(()->{System.out.println(Thread.currentThread().getName()+"\tcollected"+finalI+"龙珠");try{cyclicBarrier.await();}catch(InterruptedExceptione){e.printStackTrace();}catch(BrokenBarrierExceptione){e.printStackTrace();}},String.valueOf(i)).start();}}源码分析CyclicBarrier类图CyclicBarrier包含“ReentrantLock对象锁”和“Condition对象行程”,通过排他锁实现。其内部主要变量和方法如下:成员变量//同步操作锁privatefinalReentrantLocklock=newReentrantLock();//同步操作锁privatefinalReentrantLocklock=newReentrantLock();//线程拦截器privatefinalConditiontrip=lock.newCondition();//每次拦截的线程数privatefinalintparties;//替换之前执行的任务privatefinalRunnablebarrierCommand;//表示barrier的当前代数privateGenerationgeneration=newGeneration();//Counterprivateintcount;//静态内部类GenerationprivatestaticclassGeneration{booleanbroken=false;}可以看出在CyclicBarrier内部,线程被条件队列行程阻塞,它维护了两个int变量parties和count:parties表示每次拦截的线程数,当被阻塞时赋值建;count是内部的计数器,初始值与parties相同,每次调用await方法都会减1,并唤醒所有线程,直到减到0。CycliBarrier有一个静态的内部类Generation,对象是这个类代表当前生成的屏障,就像玩游戏时的当前部分,可以用来实现循环等待。barrierCommand表示替换前执行的任务。当计数减少到0时,表示游戏结束,需要进行下一句。在进行下一句游戏之前,所有阻塞的线程都会被唤醒。在唤醒所有线程之前,您可以指定barrierCommand来执行您自己的任务。构造函数主要提供两个构造方法publicCyclicBarrier(intparties){this(parties,null);}publicCyclicBarrier(intparties,RunnablebarrierAction){if(parties<=0)thrownewIllegalArgumentException();//parties意思是“必须同时到达barrier时间线程数”。this.parties=parties;//count表示“处于等待状态的线程数”。this.count=parties;//barrierCommand表示“当各方线程到达屏障时将执行的操作”。this.barrierCommand=barrierAction;}分析:parties是参与线程的个数第二个构造方法有一个Runnable参数,表示最后一个到达的线程要执行的动作。CyclicBarrier类的重要方法主要作用是阻塞先到达屏障点的线程,等待后面的线程。它提供了定时等待和非定时等待两种等待方式。await()方法//非定时等待publicintawait()throwsInterruptedException,BrokenBarrierException{try{returndowait(false,0L);}catch(TimeoutExceptiontoe){thrownewError(toe);}}//定时等待publicintawait(longtimeout,TimeUnitunit)throwsInterruptedException,BrokenBarrierException,TimeoutException{returndowait(true,unit.toNanos(timeout));}分析:调用await()的线程说明summary到了barrier。BrokenBarrierException表示屏障已被打破。销毁的原因可能是其中一个线程在await()或超时期间被中断。在dowait()方法中我们可以看到无论是定时等待还是非定时等待,都是调用了dowait方法,只是传入的参数不同而已。我们来看看dowait方法做了什么。//核心等待方法privateintdowait(booleantitimed,longnanos)throwsInterruptedException,BrokenBarrierException,TimeoutException{//显示锁finalReentrantLocklock=this.lock;lock.lock();try{finalGenerationg=generation;//检查当前barrier是否翻转if(g.broken){thrownewBrokenBarrierException();}//检查当前线程是否被中断if(Thread.interrupted()){//如果当前线程被中断,会做以下三件事//1.推翻当前屏障//2.唤醒所有被拦截的线程//3.抛出中断异常breakBarrier();thrownewInterruptedException();}//将计数器的值减1intindex=--count;//将计数器的值减为0然后需要唤醒所有线程并切换到下一代if(index==0){booleanranAction=false;try{//唤醒所有线程前执行指定任务finalRunnablecommand=barrierCommand;if(command!=null){command.run();}ranAction=true;//唤醒所有线程,进入nextGeneration();return0;}finally{//确保任务执行不成功时,所有线程都能被唤醒if(!ranAction){breakBarrier();}}}//如果计数器不为0,则执行此循环for(;;){try{//根据传入的参数判断是定时等待还是非定时等待if(!timed){trip.await();}elseif(nanos>0L){nanos=trip.awaitNanos(nanos);}}catch(InterruptedExceptionie){//如果当前线程在等待期间中断,则翻墙唤醒其他线程if(g==generation&&!g.broken){breakBarrier();throwie;}else{//如果在捕获中断异常之前已经在barrier上完成等待,然后直接调用中断操作Thread.currentThread().interrupt();}}//如果线程因为翻墙操作而被唤醒,则抛出异常if(g.broken){thrownewBrokenBarrierException();}//如果线程因为替换操作被唤醒,则返回计数器的值fence并抛出异常if(timed&&nanos<=0L){breakBarrier();thrownewTimeoutException();}}}finally{lock.unlock();}}上面执行的代码比较容易理解,我们来看看执行过程:获取显示锁,判断当前线程状态是否被中断,如果是,则执行breakBarrier方法唤醒之前所有被阻塞的线程,并重置计数器;计数器count减1,如果count==0,表示最后一个线程到达barrier,然后执行之前指定的Runnable接口,同时执行nextGeneration方法进入下一代;否则进入自旋,判断当前线程是进入定时等待还是非定时等待。如果在等待过程中被中断,则执行breakBarrier方法,唤醒所有之前阻塞的线程;判断是否是因为执行了breakBarrier方法。被唤醒,如果是,则抛出异常;判断是否被正常的替换操作唤醒,如果是,则返回计数器的值;判断是否超时唤醒,是则唤醒所有之前阻塞的线程,并抛出异常;释放锁breakBarrier()methodprivatevoidbreakBarrier(){generation.broken=true;//屏障被打破count=parties;//重置counttrip.signalAll();//唤醒之前阻塞的线程}nextGeneration()methodprivatevoidnextGeneration(){//唤醒所有线程trip.signalAll();//重置计数器count=parties;//重启generation=newGeneration();}reset()方法接下来看fencereset的方法//重置barrier为在初始状态下,所有等待的线程最终都会抛出BrokenBarrierException。publicvoidreset(){finalReentrantLocklock=this.lock;lock.lock();try{breakBarrier();//breakthecurrentgenerationnextGeneration();//startanewgeneration}finally{lock.unlock();}}CyclicBarrier还提供了其他方法examplegetParties、isBroken、getNumberWaiting等方法比较简单。除了getParties是不可变的,因为parties是最终确定的,其余方法将首先获取互斥锁。/***获取当前回合是否已经破局。*/publicbooleanisBroken(){finalReentrantLocklock=this.lock;lock.lock();try{returgengeneration.broken;}finally{lock.unlock();}}/***获取当前在barrier中等待的线程数.*/publicintgetNumberWaiting(){finalReentrantLocklock=this.lock;lock.lock();try{returnparties-count;}finally{lock.unlock();}}CountDownLatch和CyclicBarrier的区别总结CountDownLatch和CyclicBarrier都可以实现线程间连接等待,但侧重点不同:CountDownLatch一般用于一个或多个线程,等待其他线程完成任务后再执行;CyclicBarrier一般用于一组线程互相等待到某个状态,然后这组线程然后同时执行;CountDownLatch是一次性的,CyclicBarrier是可回收的;CountDownLathch是一个计数器,线程完成一条记录,计数器递减,只能使用一次。如下图:CyclicBarrier的计数器更像是一个阀门,需要所有线程都到达,然后继续执行,计数器递减,它提供了一个reset函数,可以多次使用。如下图:PS:以上代码提交在Github上:https://github.com/Niuh-Study/niuh-juc-final.gitPS:这里有个技术交流群(QQ群:1158819530),方便供大家一起交流,继续学习,共同进步,有需要的可以补充。