前言Java提供了一些非常有用的并发工具类,我们不需要重新造轮子。在本节中,我们将解释CyclicBarrier。来看看吧~CyclicBarrier类似于我们上一节讲到的CountDownLatch。它的字面意思是它相当于一个可回收的屏障。它和CountDownLatch的区别在于它可以被复用。下一步的操作取决于上一步是否完成。就像去银行做生意一样,眼前的人已经做了。轮到你了。我们继续用上一节的例子来改写一下。我在这里很懒惰。在实际业务中,尽量使用classes,而不是newThread。公共类CyclicBarrierTest{publicstaticvoidmain(String[]args)throwsBrokenBarrierException,InterruptedException{CyclicBarriercyclicBarrier=newCyclicBarrier(1);IntStream.range(0,10).forEach(i->{newThread(()->{try{Thread.sleep(2000);System.out.println("worker1------>"+i);cyclicBarrier.await();Thread.sleep(2000);System.out.println("worker2----->"+i);cyclicBarrier.await();Thread.sleep(2000);System.out.println("worker3------>"+i);cyclicBarrier.await();}catch(InterruptedException|BrokenBarrierExceptione){e.printStackTrace();}}).start();});System.out.println("完成!");}}实际输出:完成!工人1------>9工人1------>0工人1------>6工人1------>7工人1------>5工人1------>4worker1------>1worker1------>3worker1------>2worker1------>8worker2------>7worker2------>6worker2------>5worker2------>2worker2------>3worker2------>1worker2-------->8worker2------>0worker2------>9worker2------>4worker3------>6worker3------>3工人3------>2工人3------>5工人3------>7工人3------>8工人3------>1工人3----->0worker3------>9worker3------>4可以看到即使在多线程下,每一个操作都需要在上一个await任务之后执行,非常简单而且goodtouse知其然,知其所以然&源码分析一起来探究一下,它是怎么做到的?同样,我们先看构造函数。publicCyclicBarrier(intparties){this(parties,null);}publicCyclicBarrier(intparties,RunnablebarrierAction){if(parties<=0)thrownewIllegalArgumentException();this.parties=派对;this.count=派对;this.barrierCommand=barrierAction;}默认barrierAction为null,此参数为Runnable参数,最后一个线程到达时要执行的任务,刚才的例子没有演示,初始化时传入一个即可,并打印出当前的一个Threadname,这样比较容易理解,partiesint类型,表示参与线程的个数。我们再看一下它的定义,可以看到它没有继承任何类,也没有实现任何接口。publicclassCyclicBarrier{....}await让我们关注这个方法。publicintawait()throwsInterruptedException,BrokenBarrierException{try{returndowait(false,0L);}catch(TimeoutExceptiontoe){thrownewError(toe);//不可能发生}}为什么使用这个方法?等到所有各方都在此屏障上调用await。如果当前线程不是最后一个到达的,则出于线程调度目的,它被禁用和休眠,但以下情况除外:最后一个到达的线程;或者。其他某个线程中断了当前线程;或者。其他一些线程中断了其他等待线程之一;或者。其他线程在等待屏障时超时;或者。一些其他线程调用在此屏障上重置。再看dowait(),是私有方法。privateintdowait(booleantimed,longnanos)throwsInterruptedException,BrokenBarrierException,TimeoutException{//全局锁finalReentrantLocklock=this.lock;锁.锁();try{//每次使用屏障都会生成一个实例//privateGenerationgeneration=newGeneration();最后一代g=一代;//broken字面意思是破坏,如果被破坏,则抛出异常if(g.broken)thrownewBrokenBarrierException();//线程中断检测if(Thread.interrupted()){breakBarrier();抛出新的中断异常();}//剩余等待线程数intindex=--count;//当最后一个线程到达时if(index==0){//tripped//标记任务是否执行(即传入的runable参数)booleanranAction=false;try{finalRunnable命令=barrierCommand;//执行任务如果(命令!=null)command.run();ranAction=真;//完成后,进行下一组初始化生成,初始化计数并唤醒所有等待线程////privatevoidnextGeneration(){////上一代完成的信号//trip.signalAll();////设置下一代//count=parties;//generation=newGeneration();//}下一代();返回0;}finally{if(!ranAction)breakBarrier();}}//当索引不为0时,进入自旋for(;;){try{//先判断超时没有超时,然后继续等待if(!timed)trip.await();//超过指定时间调用awaitNanos超时释放锁elseif(nanos>0L)nanos=trip.awaitNanos(nanos);//中断异常捕获}catch(InterruptedExceptionie){//判断是否中断if(g==generation&&!g.broken){//privatevoidbreakBarrier(){//generation.broken=true;//计数=派对;//trip.signalAll();//}breakBarrier();扔即;}else{//否则中断当前线程Thread.currentThread().interrupt();}}//中断并抛出异常if(g.broken)thrownewBrokenBarrierException();//正常调用返回if(g!=generation)return指数;//超时时调用breakBarrier()并唤醒if(timed&&nanos<=0L){breakBarrier();抛出新的TimeoutException();}}}finally{//释放锁lock.unlock();}}销毁如何恢复?再来看看reset,源码很简单。break后会重新生成一个新的instance,并重新初始化对应的count。在dowait中,index==0也调用了nextGeneration,所以可以回收publicvoidreset(){finalReentrantLocklock=this.lock;锁.锁();试试{breakBarrier();//打断当前代nextGeneration();//开始新一代}finally{lock.unlock();}}结论cyclicBarrier源码比较简单,下一节会介绍Phaser,它是CountDownLatch的增强版,实现相对复杂一些。
