当前位置: 首页 > 后端技术 > Java

AbstractQueueSynchronizer(ExclusiveLock)

时间:2023-04-01 20:18:22 Java

AQS代码概述节点类分析通过ReentrantLock窥探AQS独占锁最简单实例可重入锁实例锁竞争实例结束基本介绍JUC中很多并发类继承AbstractQueuedSynchronizer(AQS),如CountDownLatch、ReentrantLock、ThreadLocalExcutor等,主要实现同步状态的管理,对阻塞线程进行排队等待通知。以ReetrantLock为例。它有以下几个获取锁的函数,那些未能竞争到这个锁的线程将被存储在一个集合中。释放锁,集合中的线程会被唤醒重新出现争夺锁。使用锁创建Condition对象。上面的写函数都是依赖AQS实现的,因为ReetrantLock只能被一个线程获取,所以是独占锁,而ReadLock像ReentrantReadWriteLock一样可以被多个线程共享,也就是说是共享锁。AQS不仅提供了排他锁的一些底层实现,还提供了共享锁的实现。因此,AQS中的内容可以分为四部分:CLH队列:存放等待线程,主要通过双向链表实现,CLH是其发明者三大人物名字的首字母。独占锁共享锁Condition实现AQS代码概述AbstractQueuedSynchronizer这个类包含两个内部类,其中ConditionObject是Condition函数的主要实现。一般创建Condition的方式是Lock.newCondition(),我们可以通过查看ReentrantLock的源码找到。实际创建的Condition是一个ConditionObject实例。Node是等待线程的载体,即等待线程所在的双向链表上的节点。AbstractQueuedSynchronizer中有大量方法,其中tryAcquire和tryAcquireShared是排他锁和共享锁中“相似方法”的不同实现。下图是AbstractQueuedSynchronizer中的一些成员变量,其中head和tail是Node变量,分别用来表示队列头尾节点,state表示同步状态,stateOffset表示state变量相对于java对象的偏移量,它是相对于AbstractQueuedSynchronizer.class的偏移量(class也是对象,java中一切都是对象),主要用于后面使用CAS对相应的变量进行赋值和修改值。headOffset和tailOffset同理,waitStatusOffset和nextOffset是相对于Node.class的偏移量。另外,在AbstractQueuedSynchronizer的父类AbstractOwnableSynchronizer中还有一个重要的变量exclusiveOwnerThread,它在独占模式下表示拥有当前锁的线程。Node类解析staticfinalclassNode{//用于标记共享模式staticfinalNodeSHARED=newNode();//用于标记独占模式staticfinalNodeEXCLUSIVE=null;//waitStatus是这个值,表示线程已经取消了staticfinalintCANCELED=1;//当waitStatus为这个值时,表示后续线程需要解除阻塞staticfinalintSIGNAL=-1;//当waitStatus为这个值时,表示线程在Condition下处于等待状态staticfinalintCONDITION=-2;//当waitStatus为这个值时,表示允许下一次acquireShared操作staticfinalintPROPAGATE=-3;/***该字段可能有以下状态*SIGNAL:该节点的后继节点正在(或即将)阻塞(通过停放),因此当前节点在*释放或取消时必须unpark其后继节点。为避免竞争,获取方法*必须首先表明它们需要信号,然后重试原子获取,然后*在失败时阻塞。*CANCELLED:该节点因超时或中断而被取消*CONDITION:该节点用于条件队列,该节点不会用于安装状态下的同步队列。*PROPAGATE:这个节点是共享的*0:以上都不是**这个字段的初始值为0,由cas安全地写入它*/volatileintwaitStatus;//前节点volatileNodeprev;//继承节点volatileNodenext;//该节点拥有的线程volatileThreadthread;//可能有两个功能//1。在独占模式下等待节点//2。用于判断Shared模式的NodenextWaiter;//是否为共享模式finalbooleanisShared(){returnnextWaiter==SHARED;}//返回上一个节点finalNodepredecessor()throwsNullPointerException{Nodep=prev;如果(p==null)抛出新的NullPointerException();否则返回p;}Node(){//用于建立初始头或SHARED标记}Node(Threadthread,Nodemode){//由addWaiter使用this.nextWaiter=mode;this.thread=线程;}Node(Threadthread,intwaitStatus){//由条件使用this.waitStatus=waitStatus;this.thread=线程;}}通过ReentrantLock窥探AQS独占锁接下来我们通过几个例子来探究AQS中一些方法的实现以及它们在ReentrantLock中的作用。最简单的例子,我们先从一个简单的lock&unLock例子开始,通过断点进入。锁的具体实现在ReentrantLock的NonfairSync内部类中,这是由于我们为锁对象设置的非公平锁。然后我们会进入AQS中的compareAndSetState方法,主要是通过cas判断state是否为0,yes——改成1返回true,no——直接返回false不做任何修改,如果是0说明这个锁当前没有被任何线程持有,然后我们将其状态更改为1以表示它已被持有。返回yes后,将AQS中独占线程的字段赋值给当前线程,则加锁成功。然后我们进入ReentrantLock的解锁方法。该方法的主要实现是在AQS的release方法中进入tryRelease方法,会获取锁的当前状态,然后用c表示要改变的目标状态。验证通过后,锁将独占线程设置为null,并将其状态字段state修改为目标状态,此时锁已经解锁。tryRelease返回true后,会判断AQS中是否有等待节点,如果存在则将其唤醒(后面看这里的源码)。可重入锁实例我们使用同一个ReentrantLock进行两次加锁操作,因为第一次和上面简单例子的过程是一样的,所以我们只关注第二次加锁和解锁。此时由于已经加锁一次,即state=1,compareAndSetState(0,1)不会赋值成功,所以会进入acquire方法,然后先进入tryAcquire方法,我们判断在TryAcquire中再次获取锁的状态(因为这个过程中可能会释放上一个锁),然后因为当前线程是这个锁的独占线程,所以我们重入这个锁,最后将状态值改为2意味着当前线程已重新进入该锁两次。由于tryAcquire(1)返回true,!tryAcquire(1)为false,程序不会进入acquire方法中的后续执行流程。至此,意味着第二次加锁已经完成。和简单例子中的unlock一样,程序会先进入release方法,然后进入tryRelease方法。因为changedstate为1,所以当前锁的独占线程不会设置为null(会在上次解锁时设置)锁竞争实例锁竞争涉及到等待队列和等待节点的阻塞和唤醒,所以复杂度它的一系列操作高于上面的例子。用下面的例子来体验一下多线程竞争锁的过程。t1会先获取锁,这个过程和获取非竞争锁是一样的。主要区别在于t2获取锁和t1释放锁的过程。在idea中,可以在这个位置切换调试线程。t1线程获取到锁后,我们切换到t2线程,发现此时idea已经给我们标记了锁,锁已经被t1占用了。然后就会进入acquire方法。由于此时t1已经占用了锁,当前state≠0且锁为t1≠t2的线程,所以tryAcquire返回false,所以程序会进入addWaiter方法。该方法中会先将t2线程封装成一个Node对象,然后通过tail节点判断队列是否已经初始化。由于此时CLH队列中没有元素,所以会进入enq方法对队列进行第一次初始化。队列会在enq中初始化,然后传入的节点会被插入到队列的尾部。这里我们看到了for(;;)的无限循环(优雅点可以叫自旋),那么它的作用是什么呢?在进入enq的这个条目中,for实际上只执行了两次。第一次为头节点设置一个没有实际数据的头节点,第二次将传入节点添加到队尾。那么我们将上面的工作做成循环即可完成,比如下面代码块中的实现privateNodeenq(finalNodenode){Nodet=tail;if(t==null){//必须初始化if(compareAndSetHead(newNode()))tail=head;}node.prev=t;if(compareAndSetTail(t,node)){t.next=node;返回吨;}}其实自旋是为了保证线程安全,t2线程获取锁的时候可能会有其他线程在竞争锁。例如,当一个线程恰好在t2时刻执行Nodet=tail和compareAndSetHead(newNode())时,队列初始化成功并设置了头节点,那么compareAndSetHead将返回false。它将进入这个分支。这时,它会重新获取tail节点,并将传入的node节点插入到tail的next中。不过此时tail也有可能被其他线程改变,所以需要不断自旋尝试修改。直到位置成功,旋转结束。addWaiter结束后,会进入acquireQueued。该方法主要进行锁地抢占和阻塞等待。最后根据failed字段判断是否取消获取线程。这种情况一般设置状态为CanceledshouldParkAfterFailedAcquire来判断节点是否应该阻塞等待。如果节点处于SIGNAL状态,则表示该节点的后继节点应该被阻塞,然后会执行parkAndCheckInterrupt方法对其进行阻塞,唤醒时会判断该线程是否是打断了。一般情况下,如果t1线程没有解锁,那么t2线程会一直阻塞在parkAndCheckInterrupt方法中,被唤醒后会继续自旋尝试获取锁。然后我们切换回t1线程,进入unlock方法,调用AQS的release方法,然后tryRelease中的操作和上面两个例子一样。唯一不同的是前面例子的等待队列都是空的,即头节点都是空的,所以不会唤醒阻塞的节点,因为此时我们队列中有存放t2线程的节点,所以程序会进入unparkSuccessor方法。执行该方法后,t2线程会从之前的WAIT状态切换到RUNNING状态被唤醒!t2被唤醒后,会再次tryAcquire,成功后执行临界区内容,然后正常释放锁。最后通过ReentrantLock介绍AQS独占锁的相关内容。另外还会用到ReentrantReadWriteLock来介绍共享锁的实现,Condition的实现以及其他JUC相关类中AQS的使用。