当前位置: 首页 > 后端技术 > Java

【JUC】CountDownLatch共享节点队列

时间:2023-04-01 15:31:38 Java

1.实例与原理Useinti=3;//==1.初始化CountDownLatchcd=newCountDownLatch(i);while(i>0){newThread(()->{try{TimeUnit.SECONDS.sleep(1L);System.out.println("Biz-Threadisover");}catch(InterruptedExceptione){e.printStackTrace();}//==3.倒计时cd.countDown();}).start();i--;}//==2.阻塞cd.await();共享模式结构如图所示。与AQS家族的ReentrantLock相比,最大的不同在于CountDownLatch是共享模式,而ReentrantLock是独占模式的区别体现在两个层面1.代码层面Node节点:staticfinalclassNode{/**Shared*/staticfinalNodeSHARED=newNode();/**Exclusive*/staticfinalNodeEXCLUSIVE=null;2、功能层面的共享模式会释放共享节点的所有绑定线程(头节点会下移,head=head.next);而独占模式只会释放head.next节点绑定的线程共享模式的特性,下一章的反向使用部分会更清楚。反向使用inti=2;CountDownLatchcd=newCountDownLatch(1);while(i>0){newThread(()->{try{System.out.println(Thread.currentThread().getName()+"准备工作完成,等待主业务");cd.await();//==业务线程在这里阻塞System.out.println(Thread.currentThread().getName()+"业务开始");TimeUnit.SECONDS.sleep(1L);System.out.println(Thread.currentThread().getName()+"业务结束");}catch(InterruptedExceptione){e.printStackTrace();}})。开始();i--;}TimeUnit.SECONDS.sleep(3L);//==释放所有业务线程cd.countDown();System.out.println(Thread.currentThread().getName()+"主业务结束");在循环中,图中三个方法相互配合,shared类型的节点会一个一个释放(当然下一个点也会释放,只是图中没有显示)2.来源代码分析1.初始化publicCountDownLatch(intcount){if(count<0)thrownewIllegalArgumentException("count<0");this.sync=newSync(count);}java.util.concurrent.CountDownLatch.Sync#SyncprotectedfinalvoidsetState(intnewState){//##将状态分配给state=newState;}2.awaitpublicvoidawait()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted())抛出新的中断异常();//-A。尝试获取(判断状态)if(tryAcquireShared(arg)<0)//--b.获取共享锁doAcquireSharedInterruptibly(arg);}a.Trytoacquire(判断状态state)protectedinttryAcquireShared(intacquires){//例子中state为正数,return-1return(getState()==0)?1:-1;}b-1。获取共享锁privatevoiddoAcquireSharedInterruptibly(intarg)throwsInterruptedException{//==queueconstructionfinalNodenode=addWaiter(Node.SHARED);布尔失败=真;try{for(;;){最终节点p=node.predecessor();//==head.next尝试获取if(p==head){//$$1.countdown()方法在状态计数清零时返回1;如果未清除为0,则返回-1intr=tryAcquireShared(arg);if(r>=0){//$$3.state清零(在执行最后一个countDown之后)//#####b-2.重置头节点并释放共享setHeadAndPropagate(node,r);p.下一个=空;//帮助GCf有病=假;返回;}}//$$2.ReentrantLock这部分已经分析过了,结论就不直接展开了。//第一次设置waitstatus为signal,返回false//第二次判断waitstatus==signalReturntrueif(shouldParkAfterFailedAcquire(p,node)//===线程阻塞(唤醒时,从这里继续执行)&&parkAndCheckInterrupt())抛出新的InterruptedException();}}finally{if(failed)cancelAcquire(node);}}上面这部分代码(#####b-2.重置头节点和释放共享)需要仔细分析。详见下一章b-2。重置头节点并释放sharedprivatevoidsetHeadAndPropagate(Nodenode,intpropagate){Nodeh=head;//==移动头节点(此时入参node为head.next),head=head.nextsetHead(node);if(propagate>0||h==null||h.waitStatus<0||(h=head)==null||h.waitStatus<0){Nodes=node.next;if(s==null//共享节点会执行下面的Release逻辑||s.isShared()){//##countDown也会调用这个方法,这里不分析doReleaseShared();}}}//==head节点移动,资源释放privatevoidsetHead(Nodenode){head=node;节点线程=空;node.prev=null;}3.countDownpublicvoidcountDown(){sync.releaseShared(1);}publicfinalbooleanreleaseShared(intarg){//==a.statedecrement//state>0递减后返回false//递减state=0后,返回true(进入b逻辑)if(tryReleaseShared(arg)){//==b.释放doReleaseShared();返回真;}returnfalse;}a.statedecrementprotectedbooleantryReleaseShared(intreleases){for(;;){intc=getState();如果(c==0)返回假;//cas方法-1intnextc=c-1;if(compareAndSetState(c,nextc))//-1state=0后,返回truereturnnextc==0;}}b。释放privatevoidsetHeadAndPropagate(Nodenode,intpropagate){Nodeh=head;//记录oldhead以便下面检查//当前节点设置为head节点setHead(node);如果(传播>0||h==空||h.waitStatus<0||(h=头)==空||h.waitStatus<0){节点s=node.next;if(s==null||s.isShared())//==释放共享锁doReleaseShared();}}//==释放共享锁privatevoiddoReleaseShared(){for(;;){Nodeh=head;if(h!=null&&h!=tail){intws=h.waitStatus;if(ws==Node.SIGNAL){//###cas无法将waitstatus从-1更改为0,并再次循环if(!compareAndSetWaitStatus(h,Node.SIGNAL,0)){continue;}//###cas成功将waitstatus从-1变为0,h.next绑定的线程解锁unparkSuccessor(h);}elseif(ws==0//头节点的等待状态从0变为-3&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE)){continue;}}//--如果头节点在执行过程中没有发生变化,则跳出循环;//--如果头节点在执行过程中发生变化,在循环中再次执行上面的操作if(h==head)break;}}重点观察这部分逻辑###cas成功将waitstatus由-1变为0,h.next绑定的线程畅通privatevoidunparkSuccessor(Nodenode){//确保waitstatus由-1到0(cas模式)intws=node.waitStatus;如果(ws<0)compareAndSetWaitStatus(节点,ws,0);//尾节点或取消节点专门处理Nodes=node.next;如果(s==null||s.waitStatus>0){s=null;for(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}//==解锁node.next绑定的线程if(s!=null)LockSupport.unpark(s.thread);}