当前位置: 首页 > 科技观察

一篇文章给大家带来CountDownLatch的原理

时间:2023-03-12 12:05:40 科技观察

前言CountDownLatch是多线程中比较重要的一个概念,它可以让一个或多个线程等待其他线程执行完毕后再执行。它内部有一个计数器和一个阻塞队列。每当线程调用countDown()方法时,计数器的值减1,当计数器的值不为0时,调用await()方法的线程会被加入阻塞队列,阻塞直到值计数器的值为0。常用方法publicclassCountDownLatch{//构造一个计数器,值为countpublicCountDownLatch(intcount);//阻塞当前线程,直到计数器为0publicvoidawait()throwsInterruptedException;//在unit超时时间内阻塞当前线程publicbooleanawait(longtimeout,TimeUnitunit);//将计数器的值减1,当计数器的值为0时,阻塞队列中的线程可以运行publicvoidcountDown();}下面是一个简单的例子:packagecom.yang...->{try{Thread.sleep(2000);System.out.println(Thread.currentThread().getName()+"runningfinished");}catch(InterruptedExceptione){e.printStackTrace();}finally{latch.countDown();}}).start();}latch.await();System.out.println("主线程有finishedrunning");}}输出结果如下:可以看出,主线程要等到三个子线程执行完毕后才会执行。从原理分析类图中可以看出,CountDownLatch中有一个继承自AQS的内部类Sync。其实AQS支持CountDownLatch的各种操作。CountDownLatch(intcount)newCountDownLatch(intcount)用于创建AQS同步队列,并将计数器的值赋给AQS的状态。publicCountDownLatch(intcount){if(count<0)thrownewIllegalArgumentException("count<0");this.sync=newSync(count);}privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{Sync(intcount){setState(count);}}countDown()countDown()该方法会将计数器减1。当计数器值为0时,将唤醒所有在阻塞队列中等待的线程。它内部调用了Sync的releaseShared(1)方法publicvoidcountDown(){sync.releaseShared(1);}publicfinalbooleanreleaseShared(intarg){if(tryReleaseShared(arg)){//此时计数器的值为0,唤醒upallblocked线程doReleaseShared();returntrue;}returnfalse;}tryReleaseShared(arg)内部使用自旋+CAS操作将计数器的值减1,当减为0时,方法返回true,doReleaseShared()方法将被调用。不了解CAS机制的同学可以参考我的另一篇文章探讨CAS实现原理protectedbooleantryReleaseShared(intreleases){//spinfor(;;){intc=getState();if(c==0)//此时计数器的值已经为0,其他线程已经执行完毕,当前线程已经再次执行,无需再次唤醒returnfalse;intnextc=c-1;//使用CAS机制改变状态值变成state-1if(compareAndSetState(c,nextc))returnnextc==0;}}doReleaseShared()是AQS中的一个方法,唤醒队列中所有阻塞的线程。privatevoiddoReleaseShared(){for(;;){Nodeh=head;if(h!=null&&h!=tail){intws=h.waitStatus;if(ws==Node.SIGNAL){if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))continue;//looptorecheckcasesunparkSuccessor(h);}elseif(ws==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))continue;//looponfailedCAS}if(h==head)//loopifheadchangedbreak;}}这个方法比较难理解,会在另一个空间介绍。这里,就认为这个方法会唤醒所有被调用await()方法阻塞的线程。await()当计数器的值不为0时,该方法会将当前线程加入阻塞队列,并挂起当前线程。publicvoidawait()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}也委托内部类Sync调用其acquireSharedInterruptibly()方法publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedExceptiong()(Shared(<0)doAcquireSharedInterruptibly(arg);}接下来看Sync中的tryAcquireShared()方法,如果当前计数器值为0,则返回1,最终导致await()不会阻塞线程,返回-1protectedinttryAcquireShared(intacquires){return(getState()==0)?1:-1;}当tryAcquireShared方法返回负值时,会调用AQS中的doAcquireSharedInterruptibly()方法来将调用await()方法的线程添加到Block队列中,挂起该线程。privatevoiddoAcquireSharedInterruptibly(intarg)throwsInterruptedException{//将当前线程构造成共享模式节点,加入阻塞队列finalNodenode=addWaiter(Node.SHARED);booleanfailed=true;try{for(;;){finalNodep=node.predecessor();if(p==head){intr=tryAcquireShared(arg);if(r>=0){setHeadAndPropagate(node,r);p.next=null;//helpGCfailed=false;return;}}if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())thrownewInterruptedException();}}finally{if(failed)cancelAcquire(node);}}同样,上面的代码位于AQS中,看不懂AQS结构就看上面的了代码有些难度,AQS源码另文空间介绍。使用场景CountDownLatch的使用场景非常广泛,一般用来单独做一些事情,然后再归纳起来。例如:数据报表:目前微服务架构很流行,大部分项目都会拆分成若干个子服务,所以报表服务在进行统计时需要从各个服务中提取数据。这时候可以创建与服务个数相同的线程,交给线程池处理。每个线程从相应的服务中提取数据。注意countDown()操作需要在finally语句块中进行。主线程调用await()阻塞,直到所有数据提取成功,最后主线程对数据进行过滤和组装,形成直观的报表。风险评估:客户端同步请求查询用户的风险等级。服务端收到请求后,会请求多个子系统获取数据,然后使用风险评估规则模型进行风险评估。如果使用单线程完成这些操作,同步请求很可能会超时,因为服务器请求多个子系统依次排队,请求子系统获取数据的时间是线性累加的。此时可以使用CountDownLatch,让多个线程并发请求多个子系统。获取到多个子系统的数据后,再进行风险评估,使得请求子系统获取数据的时间与最耗时请求的时间相等。显着减少处理时间。

最新推荐
猜你喜欢