抽象队列同步器(AQS-AbstractQueuedSynchronizer)从名字上理解:抽象:是一个抽象类,队列由子类实现:数据结构是队列,队列用于存储数据。Synchronization:基于它可以实现同步功能我们将从这几个方面开始解读,但是首先要了解AbstractQueuedSynchronizer的特点首先要知道以下几个特点1.AQS可以实现独占锁和共享锁。2.排他锁exclusive是一种悲观锁。保证只有一个线程经过一个阻塞点,只有一个线程可以获取到锁。3.共享锁shared是一种乐观锁。可以允许多个线程阻塞点,多个线程可以同时获取锁。它允许通过多个读取操作或单个写入操作访问资源,但不能同时进行这两种操作。4、AQS使用一个int类型的成员变量state来表示同步状态。当state>0时,表示已经获取到锁。当state=0时,没有锁。它提供了三种方法(getState()、setState(intnewState)、compareAndSetState(intexpect,intupdate))对同步状态state进行操作,可以保证state操作的安全。5、AQS是通过一个CLH队列实现的(CLH锁是Craig,Landin,andHagersten(CLH)锁,CLH锁是自旋锁,可以保证没有饥饿,提供先到先得的公平性。CLH锁是它也是一种基于链表的可扩展、高性能、公平的自旋锁,应用线程只对局部变量进行自旋,不断轮询前驱的状态,如果发现前驱释放了则结束自旋lock.)Abstractus看一下源码可以看到,它继承自AbstractOwnableSynchronizer,是一个抽象类。publicabstractclassAbstractQueuedSynchronizerextendsAbstractOwnableSynchronizerimplementsjava.io.SerializableAQS在内部使用易失性变量状态作为资源标识符。同时定义了几个受保护的获取和改变状态的方法,子类可以重写这些方法来实现自己的逻辑。可以看到该类为我们提供了几个protected级别的方法,分别是//创建队列Synchronizer实例,初始状态为0protectedAbstractQueuedSynchronizer(){}//返回同步状态的当前值。protectedfinalintgetState(){returnstate;}//设置同步状态的值protectedfinalvoidsetState(intnewState){state=newState;}//独占模式。尝试获取资源,成功时返回true,失败时返回false。protectedbooleantryAcquire(intarg){thrownewUnsupportedOperationException();}//独占模式。尝试释放资源,成功时返回true,失败时返回false。protectedbooleantryRelease(intarg){thrownewUnsupportedOperationException();}//共享方法。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,还有剩余资源protectedinttryAcquireShared(intarg){thrownewUnsupportedOperationException();}//共享方法。尝试释放资源,如果释放后允许唤醒后续等待节点,则返回true,否则返回false。protectedbooleantryReleaseShared(intarg){thrownewUnsupportedOperationException();}虽然这些方法都是protected方法,但是在AQS中并没有实现,而是直接抛出异常。AQS实现了一系列主要逻辑。由此我们可以看出AQS是一个抽象的构建锁和同步器的框架。AQS可用于轻松高效地构建广泛使用的同步器。比如我们提到的ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask等都是基于AQS的。我们还可以使用AQS非常轻松地构造自定义同步器,只要子类实现它的几个受保护方法即可。queueAQS类本身实现了等待队列的特定线程的维护(比如获取资源失败入Queue/Wakeuptodequeue等)。它内部使用先进先出(FIFO)双端队列(CLH),并使用两个指针head和tail来标识队列的头部和尾部。其数据结构如图:队列中不直接存放线程,而是存放拥有线程的Node节点。我们看一下Node的结构:staticfinalclassNode{//标记一个节点(对应线程)以共享模式等待staticfinalNodeSHARED=newNode();//标记一个节点(对应线程)以独占模式等待staticfinalNodeEXCLUSIVE=null;//waitStatus的值表示该节点(对应线程)已经取消staticfinalintCANCELLED=1;//waitStatus的值表示需要唤醒后续节点(对应线程)staticfinalintSIGNAL=-1;//waitStatus值,表示该节点(对应线程)正在等待某个条件staticfinalintCONDITION=-2;//waitStatus的值表示资源可用,需要新的头节点继续唤醒后继节点//(共享中mode,多线程并发释放资源,head唤醒后继节点后,//需要把多余的资源留给后面的节点;当设置新的head节点时,会继续唤醒它的successssornodes)staticfinalintPROPAGATE=-3;//等待状态,取值范围,-3,-2,-1,0,1volatileintwaitStatus;volatileNodeprev;//前驱节点volatileNodeext;//后继节点volatileThreadthread;//节点对应线程NodenextWaiter;//队列中等待下一个等待条件的节点//判断共享模式的方法finalbooleanisShared(){returnextWaiter==SHARED;}Node(Threadthread,Nodemode){//使用了addWaiterthis.nextWaiter=mode;this.thread=thread;}??//其他方法忽略,具体可以参考源码}//AQS中addWaiter的私有方法privateNodeaddWaiter(Nodemode){//使用Node的这个构造函数Nodenode=newNode(Thread.currentThread(),mode);//其他代码省略},我们可以通过Node实现两种队列,一种是实现CLH队列(线程同步queue,双向队列)通过prev和next,另一个是nextWaiter实现了Condition条件上的等待线程队列(单向队列)。该Condition主要用于同步ReentrantLock类中的两种同步方式:独占模式(Exclusive):资源是独占的,一次只能有一个线程获取如ReentrantLock。共享模式(Share):可以被多个线程同时获取,具体资源数量可以通过参数指定。比如信号量/CountDownLatch。同时实现两种模式的同步类,比如ReadWriteLock获取资源获取资源的入口是acquire(intarg)方法。arg是要获取的资源数量,在独占模式下始终为1。publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}首先调用tryAcquire(arg)尝试获取资源。如前所述,该方法是在子类中实现的。如果资源获取失败,则通过addWaiter(Node.EXCLUSIVE)方法将线程插入等待队列。传入的参数表示要插入的Node是独占的。该方法的具体实现:privateNodeaddWaiter(Nodemode){//生成线程对应的Node节点Nodenode=newNode(Thread.currentThread(),mode);//将Node插入队列Nodepred=tail;if(pred!=null){node.prev=pred;//尝试使用CAS,如果成功,则返回if(compareAndSetTail(pred,node)){pred.next=node;returnnode;}}//如果等待队列为空或上面的CAS失败,然后自旋CAS插入enq(node);returnnode;}//AQS会有多个线程同时竞争资源,//所以肯定会有多个线程同时插入节点,//这里通过CAS的自旋方法保证了运行的线程安全。//自旋CAS插入等待队列privateNodeenq(finalNodenode){for(;;){Nodet=tail;if(t==null){//Mustinitializeif(compareAndSetHead(newNode()))tail=head;}??else{node.prev=t;if(compareAndSetTail(t,node)){t.next=node;returnt;}}}}如果设置成功,则表示获取到锁,返回true。把state置0和置1的动作在外部已经做过一次,在内部再做一次只是增加了概率,而且这样的操作相对于锁来说并不占用开销。如果status不为0,则判断当前线程是否为独占锁的拥有者。如果是Owner,尝试通过acquires来增加status(即增加1)。如果状态值超过限制,将抛出异常。如果没有限制,设置状态后返回true(实现了一个类似bias,reentrant的功能,但是不需要再申请)。如果状态不为0且self不是所有者,则返回false。现在通过addWaiter方法,已经在等待队列的尾部放置了一个Node。等待队列中的节点逐一从头节点获取资源。下面看一下acquireQueued方法的具体实现:前驱节点p为head,表示node为第二个节点,可以尝试获取资源if(p==head&&tryAcquire(arg)){//获取到资源后,将head指向该节点。//所以head指向的节点就是当前获取资源的节点或null。setHead(node);p.next=null;//helpGCfailed=false;returninterrupted;}//如果可以休息,进入等待状态,直到unpark()if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{if(failed)cancelAcquire(node);}}这里,parkAndCheckInterrupt方法在内部使用LockSupport.park(this)。顺便简单介绍一下公园。LockSupport类是Java6中引入的一个类,它提供基本的线程同步原语。LockSupport实际上调用了Unsafe类中的函数。在Unsafe中,只有两个函数:park(booleanisAbsolute,longtime):阻塞当前线程unpark(Threadjthread):停止给定线程阻塞所以节点进入等待队列后,调用park使之进入阻塞状态。只有头节点的线程处于活动状态。acquire方法获取资源的过程:当然除了acquire之外,还有其他三种获取资源的方式:acquireInterruptibly:申请可中断资源(独占模式)acquireShared:以共享方式申请资源acquireSharedInterruptibly:申请可中断资源(sharedmode)Mode)Interruptible表示线程被中断时可能会抛出InterruptedException。释放资源比获取资源要简单得多。AQS中只有一个简短的实现。源码:publicfinalbooleanrelease(intarg){if(tryRelease(arg)){Nodeh=head;if(h!=null&&h.waitStatus!=0)unparkSuccessor(h);returntrue;}returnfalse;}tryRelease方法可以考虑这个动作作为一个设置锁状态的操作,就是把状态减去传入的参数值(参数为1)。如果结果状态为0,则将独占锁的Owner设置为null,让其他线程有机会执行。在独占锁中,加锁时状态会加1(当然你可以自己修改这个值),解锁时减1。同一个锁可能会叠加成2,3,4这几个值,只有当unlock()的次数和lock()的次数对应时,Owner线程才会置空,只有这种情况才会返回true.大家在写代码的时候要注意这一点。如果在循环体中使用了lock()或者故意使用了两次以上的lock(),到最后只有一个unlock(),那么到最后可能锁并没有释放。Causedeadlock.privatevoidunparkSuccessor(Nodenode){//如果status为负数,尝试设置为0intws=node.waitStatus;if(ws<0)compareAndSetWaitStatus(node,ws,0);//获取head的后继者nodeNodehead.nextNodes=node.next;//如果后继节点为空或者状态大于0//通过前面的定义,我们知道只有一种可能性大于0,即这个节点有被取消if(s==null||s.waitStatus>0){s=null;//等待队列中所有有用的节点向前移动for(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}//如果后继节点不为空,if(s!=null)LockSupport.unpark(s.thread);}方法unparkSuccessor(Node),表示是时候释放锁了。它传入头节点。第一个内部动作是获取头节点的下一个节点。如果获取到的节点不为空,则直接使用“LockSupport.unpark()”方法释放对应挂起的线程。
