当前位置: 首页 > 科技观察

图16,一个State创建了这么多并发锁

时间:2023-03-18 09:59:27 科技观察

本文转载自微信公众号《程序员jinjunzhu》,作者jinjunzhu。转载本文请联系程序员jinjunzhu公众号。上一篇扔掉源码,15张图带你全面了解javaAQS,通过15张图讲解AQS监听模型中入口等待队列的原理。AQS使用FIFO队列实现了一个锁相关的并发模板,基于这个模板可以实现各种锁。JDK推荐并发锁工具类使用内部类来实现AQS的同步属性。今天我们就来说说基于AQS的各种锁。1ReentrantLock先来看一下UML类图:从图中可以看出,ReentrantLock使用抽象内部类Sync实现AQS方法,然后基于Sync同步器实现公平锁和非公平锁。主要实现了以下三个方法:tryAcquire(intarg):获取独占锁tryRelease(intarg):释放独占锁isHeldExclusively:当前线程是否持有独占锁ReentrantLock默认实现非公平锁,可以指定在构造函数中。从实现方法可以看出,ReentrantLock中获取的锁是独占锁。我们来看看获取和释放独占锁的代码:publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}的特点独占锁就是调用上面的acquire方法,传入参数为1。1.1获取公平锁获取锁首先判断同步状态(state)的值。1.1.1state等于0,表示没有线程占用锁。如果当前线程满足以下两个条件,则可以获得锁:没有前驱节点,如下图:状态值被CAS更新(0到1)成功。如果成功获取排他锁,AQS中的exclusiveOwnerThread会更新为当前线程,这个很容易理解。1.1.2state不等于0,表示有线程已经持有锁,判断持有锁的线程是否为当前线程,如下图:如果state+=1的值小于大于0,将抛出异常。如果获取锁失败,则进入AQS队列等待唤醒。1.2与公平锁相比,获取非公平锁的唯一区别是如果判断状态等于0,则不需要判断是否有前驱节点。只要CAS设置状态值(将0更新为1)成功,就获得了锁。1.3释放锁公平锁和非公平锁的释放逻辑是完全一样的,都是在内部类Sync中实现的。释放锁需要注意两点,如下图:state为什么大于1,因为是可重入的,持有锁的线程可以多次获取锁。1.4小结公平锁的特点是每个线程都要排队,所以不用担心线程永远得不到锁。但是有个缺点就是每个线程入队后都需要阻塞再唤醒,一定程度上影响了效率。非公平锁的特点是每个线程在进入队列之前都会尝试获取锁。如果获取成功,则不会进入队列,比公平锁更高效。但也有一个缺点。队列中的线程可能会等待很长时间,高并发下可能永远也获取不到锁。2ReentrantReadWriteLock先来看一下UML类图:从图中可以看出,ReentrantReadWriteLock使用抽象内部类Sync实现AQS方法,然后基于Sync同步器实现公平锁和非公平锁。主要实现了以下三个方法:tryAcquire(intarg):获取独占锁tryRelease(intarg):释放独占锁tryAcquireShared(intarg):获取共享锁tryReleaseShared(intarg):释放共享锁isHeldExclusively:当前线程是否持有排他锁,可以看出ReentrantReadWriteLock中同时使用了共享锁和排它锁。下图定义了几个常用的变量:下面两个方法可以让用户获取共享锁和独占锁的个数:staticintsharedCount(intc){returnc>>>SHARED_SHIFT;}staticintexclusiveCount(intc){returnc&EXCLUSIVE_MASK;}从sharedCount开始,共享锁的个数需要右移16位,即共享锁占据高16位。从上图中EXCLUSIVE_MASK的定义可以看出,与EXCLUSIVE_MASK的AND运算得到的是低16位的值,所以独占锁占用了低16位。如下图所示:这样,上面获取锁数量的方法就很容易理解了。参考文献1[1]2.1读锁读锁的实现对应内部类ReadLock。2.1.1获取读锁获取读锁其实就是ReadLock调用AQS的如下方法,传入参数为1:publicfinalvoidacquireShared(intarg){if(tryAcquireShared(arg)<0)doAcquireShared(arg);}ReentrantReadWriteLockinternal类Sync实现了tryAcquireShared方法,主要包括以下三种情况:使用exclusiveCount方法检查状态是否存在排他锁。如果有且独享线程不是当前线程,则返回-1,获取失败。使用sharedCount查看状态下共享锁的数量。如果读锁的个数小于最大值(MAX_COUNT=65535),那么满足以下三个条件就可以获得成功并返回1:当前线程不需要阻塞(readerShouldBlock)。在公平锁中,需要判断是否有前面的节点,如下图,需要阻塞;在非公平锁中,需要判断第一个节点是否有独占锁,如下图,需要阻塞:使用CAS改变state的值AddSHARED_UNIT(65536)。这里读锁占高位的说法比较好理解。要获得读锁,状态值必须增加这么多SHARED_UNIT。当前线程的holdCount加1。如果2失败,则自旋并重复上述步骤,直到获取到锁。tryAcquireShared(acquiresharedlock)会返回一个整数,如下:返回一个负数:获取锁失败。返回0:锁获取成功,但是后面线程获取共享锁时会失败。返回正数:获取锁成功,后续其他线程获取共享锁时也可能成功。2.1.2释放读锁ReentrantReadWriteLock通过在ReadLock中调用AQS的如下方法释放读锁,传入参数为1:publicfinalbooleanreleaseShared(intarg){if(tryReleaseShared(arg)){doReleaseShared();returntrue;}returnfalse;}ReentrantReadWriteLock的内部类Sync实现了releaseShared方法。具体逻辑分为以下两步:当前线程holdCounter值减1,CAS方法将状态值减去SHARED_UNIT。2.2写锁写锁的实现对应内部类WriteLock。2.2.1获取写锁ReentrantReadWriteLock实际上在WriteLock中调用了AQS的如下方法,传入参数1:publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}tryAcquire方法在ReentrantReadWriteLock的内部类Sync中实现。首先获取状态值和排他锁数量(exclusiveCount),然后有如下两种情况,如下图:state不等于0:排他锁数量等于0,this时,表示一个线程占用了一个共享锁。如果当前线程不是独占线程,则获取锁失败。排他锁个数不等于0,如果排他锁个数加1后大于MAX_COUNT,则获取锁失败。如果以上两个条件不匹配,则获取锁成功,state值加1。state等于0,判断当前线程是否需要阻塞(writerShouldBlock)。在公平锁中,逻辑和readerShouldBlock完全一样,都是判断队列中头节点的后继节点是否为当前线程。在非公平锁中,直接返回false,即可以直接尝试获取锁。如果当前线程不需要阻塞且状态赋值成功,则使用CAS方法将状态值加1,将独占线程设置为当前线程。2.2.2释放写锁ReentrantReadWriteLock实际上调用了WriteLock中AQS的如下方法,传入参数1:publicfinalbooleanrelease(intarg){if(tryRelease(arg)){Nodeh=head;if(h!=null&&h.waitStatus!=0)unparkSuccessor(h);returntrue;}returnfalse;}ReentrantReadWriteLock实现了Sync中的tryRelease(arg)方法,逻辑如下:判断当前线程是否为独占线程,如果不是,则抛出异常。state值减1后,用新的state值判断排他锁的个数是否等于0,等于0则将排他线程置空并返回true,这样上面的代码就可以唤醒队列中的后节点。如果不等于0,返回false,不唤醒后继节点。3CountDownLatch先来看一下UML类图:从上图可以看出,CountDownLatch的内部类Sync实现了获取共享锁和释放共享锁的逻辑。使用CountDownLatch时,构造函数会传入一个int类型的参数count,表示只有调用countDown次数count次后才能唤醒主线程。publicCountDownLatch(intcount){if(count<0)thrownewIllegalArgumentException("count<0");this.sync=newSync(count);}上面的Sync(count)是将AQS中的state赋给count。3.1awaitCountDownLatch的await方法在AQS中调用acquireSharedInterruptibly(intarg),传入参数1,但是这个参数没有用。代码如下:publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();if(tryAcquireShared(arg)<0)doAcquireSharedInterruptibly(arg);}Sync中实现了tryAcquireShared方法,await逻辑如上图所示:自旋过程就是等待state的值不断减少。只有当state的值变为0时,主线程才会跳出自旋执行后的逻辑。3.2countDownCountDownLatch的countDown方法调用了AQS的releaseShared(intarg),传入参数1,但是这个参数没有用。内部类Sync实现了tryReleaseShared方法,逻辑如下:3.3小结CountDownLatch构造函数的入参值会赋值给state变量,入队操作为主线程入队。每个子线程调用countDown后,状态值减1。当状态值变为0后唤醒主线程。4SemaphoreSemaphore是用于保护共享资源的信号量。如果一个线程想要访问一个共享资源,它首先要从Semaphore获得一个锁(信号量)。如果信号量的计数器等于0,则当前线程进入AQS队列阻塞等待。否则线程获取锁成功,信号量减1,使用共享资源后释放锁(信号量加1)。信号量与监视器模型的不同之处在于它允许多个(构造函数允许)线程进入监视器,因此它也常用于限流。UML类图如下:Semaphore的构造函数会传入一个int类型的参数来初始化state的值。4.1acquire操作调用AQS中的acquireSharedInterruptibly方法,传入参数1,代码见CountDownLatch中的await部分。Semaphore分别在公平锁和非公平锁中实现了tryAcquireShared方法。4.1.1公平锁Semaphore默认使用非公平锁。如果使用公平锁,需要在构造函数中指定。获取公平锁的逻辑比较简单,如下图所示:4.1.2获取非公平锁非公平锁的唯一区别是不判断AQS队列是否有前驱节点(hasQueuedPredecessors),而是直接尝试获取锁。除了acquire方法之外,还有其他几种获取锁的方法。原理类似,只是调用了AQS中不同的方法。4.2release释放锁的操作调用AQS中的releaseShared(intarg)方法,传入参数1,实现内部类Sync中的tryReleaseShared方法。逻辑很简单:使用CAS将state的值加1,然后唤醒队列中的后继节点。5ThreadPoolExecutorThreadPoolExecutor中也使用了AQS,见下面的UML类图:Worker主要用于ThreadPoolExecutor中断线程的时候。Worker自己实现了排他锁,中断线程时先加锁,中断运行后释放锁。按照官方的说法,这里之所以没有直接使用ReentrantLock,是为了防止在调用控制线程池的方法(类似setCorePoolSize)时重新获取锁。5.1tryAcquire使用CAS将AQS中的状态从0变为1,并设置当前线程为独占线程。5.2tryRelease设置独占线程为空,将AQS中的state改为0,Worker在初始化时,state会被置为-1,这样就无法成功获取锁。只有当runWorker方法被调用时,状态才会通过释放锁变为0。这确保只有正在运行的线程被中断,而不是等待线程。6总结AQS基于双向队列实现入口等待队列,基于状态变量实现各种并发锁。上篇文章讲了入口等待队列,本文主要讲一下基于AQS的并发锁原理。在监控模型中,还有一块没有介绍,就是条件等待队列,请看下一篇。