当前位置: 首页 > 后端技术 > Java

Java锁(三):Semaphore共享锁底层实现原理详解

时间:2023-04-01 16:31:37 Java

Java锁(三):Semaphore共享锁底层实现原理详解实现类如下:ReentrantLock:可重入锁,排他锁,实现了公平锁和非公平锁。这是上一篇介绍的类,也是最常用的类。它通常与同步进行比较。ReentrantReadWriteLock:读写锁,可以共享也可以独占。读是共享锁,写是排它锁。它还实现了公平锁和非公平锁。Semaphore:信号量锁,共享锁,也实现了公平锁和非公平锁,主要是流量控制,比如:数据库连接池给你分配10个连接,一个一个分配,如果10个都分配完了还没有释放,那就等待释放.CountDownLatch:Latching,共享锁,也实现了公平锁和非公平锁,Latchlatch的意思,举个例子:说一个漂流船上有四个人,满了就推下水。2.什么是Semaphore信号量?上面已经介绍了信号量。基于AQS的信号锁是共享锁,实现了公平锁和非公平锁。它可以用来控制同时访问特定资源的线程数,通过协调各个线程来保证资源的合理使用。信号量使用场景通常用于资源有明确访问限制的场景,常用于限流。例如:在数据库连接池中,线程同时连接的数量是有限制的,连接数不能超过一定的数量。当连接数达到限制后,后面的线程只能排队等待前面的线程释放数据库连接,获取数据库连接。例如:停车场场景,车位数量有限,同时只能听到一定数量的汽车。当车位已满时,外面的车只能等里面的车出来,才能进去停车。Semaphore的常用方法//从信号锁中获取锁,并处理阻塞状态,直到获取到锁,除非线程被中断acquire();//从信号锁中获取指定数量的锁,一直处理直到获取到锁阻塞状态,除非线程被中断acquire(intpermits);//从信号锁中获取一个锁,线程一直阻塞直到lockisacquired(忽略中断)acquireUninterruptibly();//尝试从信号lock中获取锁,返回如果获取成功或失败,线程不会被阻塞。tryAcquire();//尝试从鑫豪锁获取锁,指定获取时间,如果在指定时间内没有获取到则超时返回,线程不会阻塞tryAcquire(longtimeount,TimeUnitunit);//释放锁release();//获取等待队列中是否还有等待线程hadQueuedThreads();//获取等待队列中阻塞的线程数getQueuedLength();//清除锁并返回清除锁的数量drainPermits();//返回可用锁的数量availabelPermits();Semaphore实现原理InitializationSemaphore提供了两种构造方法,默认构造方法创建一个指定锁数的非公平信号量锁,另一种构造方法多指定一个公平锁还是非公平锁的参数,源码如下://构造一个指定锁数的非公平信号量锁publicSemaphore(intpermits){sync=newNonfairSync(permits);}//构造一个指定锁数的公平/非公平信号量锁publicSemaphore(intpermits,booleanfair){sync=fair?newFairSync(permits):newNonfairSync(permits);}锁获取过程acquireacquire()是锁获取方法,调用内部类同步器Sync,继承了AQS的实际调用AQS的acquireSharedInterruptibly()的核心,源码如下:publicvoidacquire()throwsInterruptedExcetion{sync.acquireSharedInterruptibly();}加上JDK,跟锁有关方法,Interruptibly的意思是可中断的,也就是可中断的锁。可中断锁意味着线程在等待获取锁时可以被中断。也就是说,线程可以在等待锁的同时响应中断。acquireSharedInterruptibyacquireSharedInterruptibly方法是获取可中断锁。源码如下:publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted())//检测线程的中断状态。如果已经被中断,它会响应中断。该方法将清除线程中的中断标志thrownewInterruptedException();//尝试获取锁,arg为锁的个数//当获取到锁时,为当前线程创建一个节点并加入阻塞队列if(tryAcquireShared(arg)<0)doAcquireSharedInterruptibly(arg);}的acquireSharedInterruptibly方法会先判断当前线程的中断状态。如果是中断状态,会响应中断,抛出异常,然后调用tryAcquireShared()方法获取锁。如果获取到锁,则会为当前线程创建一个节点。加入等待队列。tryAcquireSharedtryAcquireShared()是AQS定义的模板方法。它由子类实现。信号量还实现了公平锁和非公平锁。这两个锁是相似的。下面我们来看一下公平锁的具体实现。源码如下:protectedinttryAcquireShared(intacquires){//Spinfor(;;){//判断前面是否有线程,如果有则直接返回-1,如果进入阻塞状态(hasQueuedPredecessors())返回-1;//获取同步状态的值(当前可用锁的数量)intavailable=getState();//剩余锁数,available-appliedintremaining=available-acquires;//如果剩余锁数小于0,或者设置成功,返回,如果设置失败,继续循环设置//如果剩余锁数小于0,返回负数,表示表示获取锁失败//如果剩余锁数大于0,且设置status为成功,则表示获取锁成功if(remaining<0||compareAndSetState(available,remaining))returnremaining;}}tryAcquireShared()获取锁,通过自旋+CAS保证线程安全。doAcquireSharedInterruptiblydoAcquireSharedInterruptibly()方法会在当前线程前面有等待线程或者公平锁期间当前线程需要进入等待状态后获取锁时调用。它用于为当前线程创建节点并加入等待队列。源码如下:privatevoiddoAcquireSharedInterruptibly(intarg)throwsInterruptedException{//为当前线程创建一个共享模式节点,并加入到队尾finalNodenode=addWaiter(Node.SHARED);//操作失败标志booleanfailed=true;try{//Spinfor(;;){//获取当前节点的前一个节点finalNodep=node.predecessor();if(p==head){//如果前一个节点是头节点,尝试获取锁intr=tryAcquireShared(arg);if(r>=0){//如果获取成功,设置头节点并共享模式传播setHeadAndPropagate(node,r);p.next=null;失败=假;返回;}}//如果前面的节点不是头节点或者没有获取到锁//shouldParkAfterFailedAcquire方法判断当前线程是否需要阻塞//parkAndCheckInterrupt方法用于阻塞线程,检测线程是否被打断,打断则抛出错误如果(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())thrownewInterruptedException();}}finally{if(failed)//自旋异常退出取消线程获取锁cancelAcquire(node);Semaphore.acquire()方法获取令牌。该方法调用内置同步器Sync.acquireSharedInterruptibly()方法。synchronizer继承了AQS,实际调用的是AQS的acquireSharedInterruptibly()方法。acquireSharedInterruptibly()方法首先判断当前线程是否被中断。如果它被中断,将抛出InterruptedException。如果没有被单挑出来,就会调用tryAcquireShared()方法尝试获取锁。AQS的tryAcquireShared()方法只是定义了一个由子类实现的模板方法。Semaphore.Sync同步器提供了两种实现,分别是FairSync(公平锁)和NonfairSync(非公平锁)。这两种锁的实现方式相似,公平锁多了一个hasQueuedPredecessor()方法来判断自己前面是否有线程,如果有则返回-1。tryAcquireShared()方法首先通过自旋或者加锁获取当前可用的锁,然后减去要获取的锁,得到剩余的锁。如果剩余锁小于0,则直接返回,表示获取失败。否则,通过CAS获取锁,成功返回,未能获得回报失败。如果tryAcquireShared()获取锁失败,调用doAcquireSharedInterruptibly()方法为当前线程创建一个节点加入等待队列。该方法首先创建共享模式节点加入队列,然后自旋判断当前节点是否为头节点。If头节点也尝试获取锁。如果获取成功,设置头节点,成功返回。如果获取失败,会调用shouldParkAfterFailedAcquire()判断当前线程是否需要等待。如果需要等待,则调用parkAndCheckInterrupt()方法阻塞线程,判断线程是否中断,如果被中断则抛出错误,如果没有被中断则阻塞并通过自旋获取锁。如果自旋异常退出,调用cancelAcquire()方法取消线程获取锁。释放锁流程releaseSemaphore.release()方法用于释放锁,释放一个锁,源码如下:publicvoidrelease(){//释放一个共享锁sync.releaseShared(1);}release()方法调用Semaphore.Sync同步器的releaseShared()方法,继承自AQS,实际调用的是AQS.releaseShared()方法。releaseSharedreleaseShared()方法释放指定数量的共享锁。释放成功后,会唤醒等待队列中的一个线程。源码如下:publicfinalbooleanreleaseShared(intarg){//尝试释放锁if(tryReleaseShared(arg)){//如果释放成功,唤醒等待队列中的线程。CAS释放锁,源码如下:protectedfinalbooleantryReleaseShared(intreleases){//spinfor(;;){//获取当前可用锁的个数intcurrent=getState();//available+releasedintnext=current+releases;if(next