当前位置: 首页 > 科技观察

AQS入门

时间:2023-03-12 09:46:54 科技观察

Java高并发编程基础曾经有一个比较经典的面试题“能告诉我Java并发包下都有哪些常用类吗?”大多数人应该都能说出CountDownLatch、CyclicBarrier、Sempahore这三个多线程并发的利器。这三个强大的工具都是通过AbstractQueuedSynchronizer抽象类(以下简称AQS)实现的,所以在学习这三个强大的工具之前我们需要先学习AQS。AQS是一个简单的框架,它提供同步状态的原子管理、阻塞和唤醒线程功能以及队列模型。《AQS结构》说到同步,我们怎么保证同步呢?大家的第一印象肯定是锁。说到锁,大家第一个想到的肯定是Synchronized。Synchronized应该基本上大家都会用到。Locking和释放锁都是jvm实现的,我们只需要简单的加一个Synchronized关键字,使用起来超级方便。但是有没有一种情况,我们把一个锁的超时时间设置为Synchronized,实现起来有点不太可能。这时候我们就可以使用ReentrantLock来实现了,ReentrantLock是通过aqs实现的,今天我们通过ReentrantLock来学习aqsCAS&&fairlockAQS中使用了很多CAS和非公平锁,在学习AQS之前,我们还需要简单了解一下CAS,公平锁和非公平锁CASCAS的全称是compareandswap,是一种在多线程环境下使用的方法,一种实现同步功能的机制。CAS操作ration包含三个操作数:内存位置、期望值和新值。CAS的实现逻辑是将内存位置的值与期望值进行比较,如果相等,则将内存位置的值替换为新值。如果不相等,什么也不做。这个操作是一个原子操作。java中的AtomicInteger等类都是通过cas实现的。公平锁和非公平锁。公平锁:多个线程按照申请获得锁的先后顺序,线程会直接进入队列排队,队列中第一个获得锁。优点:等待锁的线程不会饿死,每个线程都可以获得锁。缺点:整体吞吐效率与非公平锁相比,等待队列中除第一个线程外的所有线程都会被阻塞,CPU唤醒阻塞线程的开销比非公平锁大。非公平锁:多个线程获取锁时,会尝试直接获取。如果他们无法获得它,他们将进入等待队列。如果他们能够获得它,他们将直接获得锁。优点:可以减少CPU唤醒线程的开销,整体吞吐效率会更高,而且CPU不必唤醒所有线程,这样会减少唤醒线程的数量。缺点:等待队列中的线程可能会饿死,或者长时间等待获取锁。文字有点啰嗦,下面用一个实际的例子来说明一下。比如我们去食堂吃饭,要排队。大家按照先到先得的顺序排队用餐。这是一个公平锁。如果你正要端着盘子吃饭的时候,突然冒出一个大胖子,跳到你面前。打不赢,他只能忍气吞声,让他跳进去,等胖子吃完了,一个小个子也会过来跳进去。你的队伍,这时候你就忍不住了,直接喊他滚出去,这小家伙只能跑到队尾去排队,这是一个不公平的锁。我们先看一下AQS的属性//headnodeprivatetransientvolatileNodehead;//blockedtailnode,每一个新的节点进来,插入到最后,形成一个链表privatetransientvolatileNodetail;//这个是最重要的,代表当前锁的状态,0表示未被占用,大于0表示有线程持有当前锁//这个值可以大于1,因为锁可以重入,加上1privatevolatileintstateforeachreentry;//表示当前持有对于有独占锁的线程,举最重要的例子,因为锁是可以重入的//reentrantLock.lock()可以嵌套调用多次,所以每次都用这个判断当前线程是否已经拥有锁//if(currentThread==getExclusiveOwnerThread()){state++}privatetransientThreadexclusiveOwnerThread;//继承自AbstractOwnableSynchronizer下面写个demo来分析加锁的过程并释放锁finalvoidlock(){//尝试直接设置状态为1,如果此时没有人获取锁,直接if(compareAndSetState(0,1))//竞争成功,然后修改线程获取锁状态setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}casattemptfails,说明已经有人持有锁了,所以进入acquire方法publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}tryAcquire方法,看名字大概就能猜到是什么意思了,试试看吧。tryAcquire其实是调用了父类Sync的nonfairTryAcquire方法finalbooleannonfairTryAcquire(intacquires){finalThreadcurrent=Thread.currentThread();//获取锁的当前状态intc=getState();//这个if逻辑一获取就获取进来锁的逻辑也是尝试通过cas获取锁if(c==0){if(compareAndSetState(0,acquires)){setExclusiveOwnerThread(current);returntrue;}}//进入这个判断的意思表示锁重入状态+1elseif(current==getExclusiveOwnerThread()){intnextc=c+acquires;//如果锁重入次数大于int的最大值,则直接抛出异常,正常情况下应该不存在,但是jdk还是严格的if(nextc<0)//overflowthrownewError("Maximumlockcountexceeded");setState(nextc);returntrue;}//返回false表示尝试获取锁失败,如果失败,必须执行acquireQueued方法returnfalse;}tryAcquire如果方法获取锁失败,则必须排队等待获取锁。排队的线程需要在哪里等待获取锁?这和我们线程池中任务的执行是一样的。线程池将任务封装成一个work,当线程无法处理任务时,再将任务放入队列中。AQS也类似,将排队获取锁的线程封装到一个NODE中。然后将NODE放入队列中。队列如下,但是需要注意的是头部不存放NODE。接下来我们继续分析源码,看看获取锁失败是如何加入到队列中的。需要执行acquireQueued方法。在执行acquireQueued方法之前,需要先执行addWaiter方法=pred;//cas加入队列尾部if(compareAndSetTail(pred,node)){pred.next=node;returnnode;}}//尾节点不为空||加入尾节点失败enq(node);returnnode;}enq接下来看enq方法//通过自旋和CAS,必须将当前节点加入到队列的尾部privateNodeenq(finalNodenode){for(;;){Nodet=tail;//尾节点为空表示队列还是空的,还没有初始化,所以初始化头节点,可以看到头节点的节点没有绑定线程,也就是没有数据存储if(t==null){//Mustinitializeif(compareAndSetHead(newNode()))tail=head;}??else{node.prev=t;if(compareAndSetTail(t,node)){t.next=node;returnt;}}}}获取loc的线程k通过addWaiter方法封装成一个NODE加入列。上述方法的执行流程图如下:接下来继续执行acquireQueued方法acquireQueuedfinalbooleanacquireQueued(finalNodenode,intarg){booleanfailed=true;try{booleaninterrupted=false;for(;;){//获取锁前驱节点通过spin==head来尝试获取锁,这个方法前面已经分析过了。finalNodep=node.predecessor();if(p==head&&tryAcquire(arg)){setHead(node);p.next=null;//helpGCfailed=false;returninterrupted;}//输入这个if表示前导nodeofnodeisnotEqualtoheadorfailedtotryacquirethelock//判断是否挂起当前线程if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{//异常情况进入cancelAcquire,这个源码在jdk11中是直接catch(Throwablee){cancelAcquire(node);}简单明了if(failed)cancelAcquire(node);}}setHead这个方法每当每当节点获取锁。简单看成是当前节点在获取到锁时“移除”(成为头节点)队列。shouldParkAfterFailedAcquire说到这个方法,首先要看一下NODE可能的状态。在源码中我们可以看到一共有四种状态:CANCELLED:值为1,在同步队列中等待的线程等待超时或者被中断。需要从同步队列中取消该Node的节点,该节点的waitStatus为CANCELLED,即结束状态,进入该状态后的节点不会再发生变化。SIGNAL:值为-1,标识为等待唤醒状态的后继节点。当前驱节点的线程释放同步锁或被取消时,会通知后继节点的线程执行。说白了就是处于唤醒状态。只要前驱节点释放了锁,就会通知标记为SIGNAL状态的后继节点线程执行。CONDITION:值为-2,与Condition有关。识别出的节点在等待队列中,该节点的线程正在等待Condition。当其他线程调用Condition的signal()方法时,处于CONDITION状态的节点将从等待队列转移到同步队列,等待获取同步锁。PROPAGATE:值为-3,与共享模式有关。在共享模式下,该状态表示该节点的线程处于可运行状态。privatestaticbooleanshouldParkAfterFailedAcquire(Nodepred,Nodenode){intws=pred.waitStatus;//如果前驱节点状态为-1,则返回true,挂起当前线程if(ws==Node.SIGNAL)returntrue;//如果是大于0,表示状态为CANCELLEDif(ws>0){do{//删除取消的节点(让取消的节点变成没有引用的节点,等待下一次GC回收)node.prev=pred=pred.prev;}while(pred.waitStatus>0);pred.next=node;}else{//这里只能输入0,-2,-3。NODE节点初始化时,waitStatus默认值为0,所以只有这里是修改waitStatus的地方//通过cas设置前驱节点状态为-1,然后返回false。外面调用的方法是一个循环,会再次调用这个方法compareAndSetWaitStatus(pred,ws,Node.SIGNAL);}returnfalse;}parkAndCheckInterrupt暂停当前线程,阻塞this);//挂起当前线程并阻塞returnThread.interrupted();}这里插入一张图片,描述解锁和锁定成功。然后当锁用完时应该释放锁。释放锁,关注unparkSuccessor这个方法。if(ws<0)compareAndSetWaitStatus(node,ws,0);Nodes=node.next;//s==nullhead的后继节点获取锁成功后,执行head.next=null操作后,解锁thread读取head.next,所以当s==null//head的后继节点被取消(cancelAcquire)时,执行如下操作:successor.waitStatus=1;successor.next=successor;if(s==null||s.waitStatus>0){s=null;//从tail节点往前看,找到第一个未取消的节点,这里没有breakfor(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}if(s!=null)//唤醒线程,被唤醒的线程会从acquireQueuedLockSupport.unpark(s.thread)中获取锁;}释放锁代码比较简单,基本都写在代码注释里了,流程如下:这段代码里有个经典的面试题:如果头节点的下一个节点为空或者头节点的下一个节点的状态头节点被取消为什么要从后往前查找,找到最前面的非注销节点呢?node.prev=pred;compareAndSetTail(pred,node)这两个地方可以看作??是Tail入队的原子操作,但是此时pred.next=node;还没有被执行。如果此时执行unparkSuccessor方法,是没办法从前往后查找的,所以需要从后往前查找。是不连通的,所以需要从后往前遍历才能遍历所有的Node。总结ReentrantLock的获取锁和释放锁基本讲完了,涉及到的细节还比较多。有兴趣的同学可以一行一行看源码调试试试。正确理解aqs可以更好的学习CountDownLatch、CyclicBarrier、Sempahore,因为这三把利器都是基于aqs的。本文转载自微信公众号“java财经”,可通过以下二维码关注。转载本文请联系爪哇财经公众号。