当前位置: 首页 > 科技观察

面试官:你了解过CountDownLatch吗?

时间:2023-03-21 20:34:27 科技观察

前言Java提供了一些非常有用的并发工具类,我们不需要重新发明轮子。本节我们将讲解CountDownLatch,一起来看看吧!CountDownLatch首先我们来看一下这个东西是做什么用的。CountDownLatch也是java.util.concurrent包下的一个工具类。通常我们称之为并发计数器。这个计数不是12345,主要使用场景是当一个任务拆分成多个子任务时,需要等待所有子任务完成后,否则会阻塞线程,每执行一个任务计数器都会-1完成,直到没有更多。这个有点类似于go语言中的sync.WaitGroup。废话不多说,我们通过例子给大家快速介绍一下。在此之前,我们需要添加它的常用方法:publicCountDownLatch(intcount){...}构造函数。voidawait()是当前线程一直等待,直到锁内存计数为0,或者线程被中断。booleanawait(longtimeout,TimeUnitunit)是当前线程等待直到锁内存计数为0,或者线程被中断,如果为0返回true,可以指定等待的超时时间。countDown()减少闩锁的计数,如果达到0则释放所有等待的线程。getCount()获取闩锁的计数。下面看具体使用:publicclassCountDownLaunchTest{publicstaticvoidmain(String[]args)throwsInterruptedException{CountDownLatchcountDownLatch=newCountDownLatch(10);IntStream.range(0,10).forEach(i->{newThread(()->{try{Thread.sleep(2000);System.out.println("worker------>"+i);}catch(InterruptedExceptione){e.printStackTrace();}最后{countDownLatch.countDown();}}).start();});countDownLatch.await();System.out.println("完成!");}}时间输出:worker------>1worker------>4worker------>5worker------>7worker------>8worker------>0worker------>2worker------>3worker------>9worker------>6completed!进程已经结束,退出码为0,可以看到任务还没有完全完成,主线程就被阻塞了。源码分析先看构造函数。私人最终同步同步;publicCountDownLatch(intcount){if(count<0)thrownewIllegalArgumentException("count<0");this.sync=newSync(count);说到CAS,涉及到多线程的实现类几乎都会有。privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{privatestaticfinallongserialVersionUID=4982264981922014374L;同步(整数计数){setState(计数);}intgetCount(){returnreresgetState();()==0)?1:-1;}protectedbooleantryReleaseShared(intreleases){//递减计数;转换为零时发出信号for(;;){intc=getState();如果(c==0)返回假;intnextc=c-1;如果(compareAndSetState(c,nextc))返回nextc==0;}}}countDown首先在构造函数中初始化state,对应setState(count);,其实它的底层实现是依赖AQS的。CountDownLatch主要有两个方法,一个是countDown,一个是await。下面我们看看如何实现。publicvoidcountDown(){sync.releaseShared(1);}publicfinalbooleanreleaseShared(intarg){if(tryReleaseShared(arg)){doReleaseShared();返回真;}returnfalse;}tryReleaseShared()方法是在countDownLatch中实现的,自旋操作判断该值是否为0,如果为0则表示执行完毕。之前说的减量到这里就完成了,会去doReleaseShared,也就是释放操作。有没有想过为什么c==0会返回false?可以回顾一下之前的操作if(tryReleaseShared)doReleaseShared,即所有任务都执行完才会释放任务。发布过程实际上是由一个队列来完成的。protectedbooleantryReleaseShared(intreleases){for(;;){intc=getState();如果(c==0)返回假;intnextc=c-1;如果(compareAndSetState(c,nextc))返回nextc==0;}}doReleaseShared是`AbstractQueuedSynchronizer'的内部方法。privatevoiddoReleaseShared(){for(;;){节点h=head;if(h!=null&&h!=tail){intws=h.waitStatus;如果(ws==Node.SIGNAL){如果(!compareAndSetWaitStatus(h,Node.SIGNAL,0))继续;//循环重新检查案例unparkSuccessor(h);}elseif(ws==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))继续;//在失败的CAS上循环}if(h==head)//如果head改变则循环break;}}这个方法之前跟大家讲过,其实就是释放锁的操作。可以看到这里只唤醒了头节点的后继节点,然后就返回了。为什么是后继节点?继续看unparkSuccessor。privatevoidunparkSuccessor(Nodenode){intws=node.waitStatus;如果(ws<0)compareAndSetWaitStatus(节点,ws,0);//后继节点Nodes=node.next;如果(s==null||s.waitStatus>0){s=null;for(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}if(s!=null)LockSupport.unpark(s.thread);}那么剩下的线程怎么释放呢?await再看await(),内部方法acquireSharedInterruptibly也被调用了。publicvoidawait()throwsInterruptedException{同步。acquireSharedInterruptibly(1);}publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();if(tryAcquire)=0){setHeadAndPropagate(node,r);p.next=null;失败=假;返回;}}//检查并更新获取失败节点的状态。如果线程应该阻塞则返回真}}finally{//如果失败则取消if(failed)cancelAcquire(node);}}