什么?你连AQS是什么都不知道?要想精通Java并发,必须掌握AQS。今天就和阿芬一起来吧。基本概念AQS是AbstractQueuedSynchronizer的缩写,翻译成中文就是抽象队列同步器,这三个词分开:Abstract(抽象):也就是说AQS是一个抽象类,只实现了一些主要逻辑,一些方法是delayed到子类去实现Queued(队列):队列有什么特点?先进先出(FIFO),对吧?也就是说,AQS是一个Synchronizer(同步),它使用先进先出队列来存储数据:也就是说,AQS实现的不仅仅是同步功能,总结一下,AQS是一个构建锁和同步器的框架。AQS可以用来简单高效地构建同步器。AQS的内部实现AQS队列内部维护了一个FIFO双向链表。如果熟悉数据结构,应该很容易想象,在双向链表中,每个节点都有两个指针,分别指向直接前驱节点和直接后继节点。使用双向链表的优点之一是可以很容易地从任何节点访问它的前驱和后继节点。在AQS中,每个Node其实就是一个线程包。当一个线程竞争锁失败时,会被打包为一个Node加入到AQS队列中;获取锁的线程释放锁后,会从队列中唤醒一个阻塞的Node(也就是线程)AQS使用volatile变量state作为资源标识:privatevolatileintstate;关于state状态的读取和修改,子类可以通过重写getState()和setState()方法实现自己的逻辑,比较重要的是//传入期望值expect,更新自己要修改的值,然后通过Unsafe的compareAndSwapInt()实现protectedfinalbooleancompareAndSetState(intexpect,intupdate),也就是CAS操作。下面是AQS中两个重要的成员变量:privatetransientvolatileNodehead;//headnodeprivatetransientvolatileNodetail;//tailnode关于AQS维护的双向链表,在源码中解释如下:CLH”(Craig、Landin和Hagersten)锁定队列。CLH锁通常用于自旋锁。我们改为将它们用于阻塞同步器,但使用相同的基本策略,即在其节点的前身中保存有关线程的一些控制信息。也就是说,AQS的等待队列是“CLH”加锁队列的变种。直接来一张图会更形象:Node节点维护线程,控制线程的一些操作,看看Node是怎么做的:staticfinalclassNode{/**Markertoindicateanodeiswaitinginsharedmode*///标记一个节点,以共享模式等待staticfinalNodeSHARED=newNode();/**Markertoindicateanodeiswaitinginexclusivemode*///标记A节点以独占模式等待staticfinalNodeEXCLUSIVE=null;/**waitStatusvaluetoindicatethreadhascancelled*///waitStatus的值表示节点已经从队列中取消staticfinalintCANCELLED=1;/**waitStatusvaluetoindicatesuccessor'sthreadneedsunparking*///waitStatus的值表示后继节点正在等待唤醒//只有处于信号状态的节点才能被唤醒staticfinalintSIGNAL=-1;/**waitStatusvaluetoindicatethreadiswaitingoncondition*///waitStatus的值表示节点正在等待一些条件staticfinalintCONDITION=-2;/***waitStatusvaluetoindicatethenextacquireSharedshould*unconditionallypropagate*///waitStatus的值表示有资源可用,新的头节点需要唤醒后继节点node//如果是共享模式,应该无条件传播同步状态staticfinalintPROPAGATE=-3;//节点状态,取值为-3,-2,-1,0,1volatileintwaitStatus;//前驱节点volatileNodeprev;//后继节点volatileNodeext;//该节点对应的线程volatileThreadthread;//条件队列中的后继节点NodenextWaiter;//判断是否共享模式最终booleanisShared(){returnnextWaiter==SHARED;}/***返回前驱节点*/finalNodepredecessor()throwsNullPointerException{Nodep=prev;if(p==null)thrownewNullPointerException();elsereturnp;}Node(){//用来建立initialheadorSHAREDmarker}/***将线程构造成Node节点,然后加入条件队列*/Node(Threadthread,Nodemode){//使用了addWaiterthis.nextWaiter=mode;this.thread=thread;}??/***使用的方法通过等待队列*/Node(Threadthread,intwaitStatus){//UsedbyConditionthis.waitStatus=waitStatus;this.thread=thread;}??}AQS如何获取资源在AQS中,获取资源的入口是acquire(intarg)方法,其中arg是个体我们看代码:publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}在获取资源的时候,tryAcquire方法会先叫。这个方法是在子类中具体实现的,如果通过tryAcquire获取资源失败,那么线程会通过addWaiter(Node.EXCLUSIVE)方法插入到等待队列中。具体代码:privateNodeaddWaiter(Nodemode){//对应生成的线程NodenodeNodenode=newNode(Thread.currentThread(),mode);//将Node插入队列Nodepred=tail;if(pred!=null){node.prev=pred;//使用CAS操作,如果成功就返回if(compareAndSetTail(pred,node)){pred.next=node;returnnode;}}//如果pred==null或者CAS操作失败,调用enq方法自旋插入enq(node);returnnode;}//自旋CAS插入等待队列privateNodeenq(finalNodenode){for(;;){Nodet=tail;if(t==null){//Mustinitializeif(compareAndSetHead(newNode()))tail=head;}??else{node.prev=t;if(compareAndSetTail(t,node)){t.next=node;return;}}}}上面可以看出使用了CAS自旋插入。这是因为在AQS中会有多个线程同时竞争资源,然后多个线程会同时插入节点。操作,这里使用CAS自旋插入是为了保证操作的线程安全现在,应用acquire(intarg)方法,然后通过调用addWaiter方法向队尾插入一个Node等待队列中的节点逐一从头节点获取资源。获取资源的方式如下:finalbooleanacquireQueued(finalNodenode,intarg){booleanfailed=true;try{booleaninterrupted=false;for(;;){finalNodep=node.predecessor();//如果Node的前驱节点p是head,表示Node是第二个节点,那么可以尝试获取资源if(p==head&&tryAcquire(arg)){//如果资源获取成功,指向head到SetHead(node);p.next=null;//helpGCfailed=false;returninterrupted;}//节点进入等待队列后,调用shouldParkAfterFailedAcquire或parkAndCheckInterrupt方法//进入阻塞状态,即只有头节点的线程处于活动状态if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{if(failed)cancelAcquire(node);}}获取资源时,除了acquire之外还有3个方法:acquireInterruptibly:应用程序可以中断资源(独占模式)acquireShared:以共享方式申请资源acquireSharedInterruptibly:app这里只针对可中断资源(共享模式),关于AQS如何获取资源就差不多了,我们来看看AQS如何释放资源AQS如何释放资源释放资源比获取资源要简单的多。源码如下:publicfinalbooleanrelease(intarg){//如果释放锁成功if(tryRelease(arg)){//获取AQS队列中的头节点Nodeh=head;//如果头节点不在empty,andthestate!=0if(h!=null&&h.waitStatus!=0)//调用unparkSuccessor(h)方法唤醒后续节点unparkSuccessor(h);returntrue;}returnfalse;}privatevoidunparkSuccessor(Nodenode){intws=node.waitStatus;//如果status为负数,尝试改为0if(ws<0)compareAndSetWaitStatus(node,ws,0);//获取头节点Nodes的后继节点=node.next;//如果waitStatus大于0,则表示本节点取消if(s==null||s.waitStatus>0){s=null;//则从tail节点开始,查找waitStatus<=0最接近头部的节点被唤醒for(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}//如果后继节点不为空,则将其从阻塞态变为非阻塞态if(s!=null)LockSupport.unpark(s.thread);}AQS两个resource共享模式资源有两种共享模式:独占模式(Exclusive):资源是独占的,即一次只能被一个线程占用,如ReentrantLock共享模式(Share):可以获取同时由多个线程。具体的资源数量可以通过参数来决定,比如Semaphore/CountDownLatch。看到这里,你有AQS吗?
