当前位置: 首页 > 科技观察

面试官:项目中有没有用到Semaphore?_0

时间:2023-03-21 22:01:00 科技观察

Semaphore就是我们讲源码的时候提到的信号量。我们来看看它的构造函数publicSemaphore(intpermits){sync=newNonfairSync(permits);}publicSemaphore(intpermits,booleanfair){sync=fairnewFairSync(permits):newNonfairSync(permits);}从构造函数可以看出,可以传入指定数量的资源并指定公平锁和非公平锁,公平锁和非公平锁就不多阐述了。我们专注于acquire()和release()。这两种方法的字面意思很容易理解。信号量常用于资源有限的场景。例如,我们需要限制一个操作的线程数。我们通过一个例子来感受一下。publicclassSemaphoreTest{publicstaticfinalclassTaskimplementsRunnable{privateintnum;私有信号量信号量;publicTask(intnum,Semaphoresemaphore){this.num=num;this.semaphore=信号量;}@Overridepublicvoidrun(){try{//获取信号量.acquire();System.out.println(String.format("num:%d,剩余资源%d,%d线程正在等待",num,semaphore.availablePermits(),semaphore.getQueueLength()));System.out.println(System.currentTimeMillis());线程.睡眠(3000);}catch(InterruptedExceptione){e.printStackTrace();}finally{//释放系统.out.println("释放资源");信号量.release();}}}publicstaticvoidmain(String[]args){信号量semaphore=newSemaphore(2);IntStream.range(0,20).forEach(i->newThread(newTask(i,semaphore)).start());}}实际输出:num:1,剩余0个资源,0个线程等待1657591518171num:0,剩余1个资源,0个线程等待1657591518172释放资源...释放资源num:18,剩余0个资源,1个threadwaitingfor1657591545235num:19,remaining0Resources,还有0个threadswaiting1657591545236releaseresources释放资源的过程结束,exitcode0源码分析下面重点看下acquire()获取licensefrom的源码这个信号量,并阻塞直到一个可用,或者线程被阻塞中断。获取一个许可证,如果一个许可证可用并立即返回,则将可用许可证的数量减少一个。publicvoidacquire()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}重点是这个同步。//首先,它继承了大家熟悉的AbstractQueuedSynchronizer。它是AQSabstractstaticclassSyncextendsAbstractQueuedSynchronizer{privatestaticfinallongserialVersionUID=1192457210091910933L;//初始化时会写入一个状态Sync(intpermits){setState(}permits);//获取当前状态finalintgetPermits(){returngetState();}//以不公平的方式获取信号量finalintnonfairTryAcquireShared(intacquires){for(;;){//当前可用intavailable=getState();//计算剩余数量intremaining=available-acquires;//如果剩余数量大于0,则进行cas修改if(remaining<0||compareAndSetState(available,remaining))returnremaining;}}//释放受保护的信号量finalbooleantryReleaseShared(intreleases){for(;;){intcurrent=getState();//发布后剩余数量intnext=current+releases;//如果超过,if(nextcurrent)//underflowthrownewError("Permitcountunderflow");如果(compareAndSetState(当前,下一个))返回;}}finalintdrainPermits(){for(;;){intcurrent=getState();如果(current==0||compareAndSetState(current,0))returncurrent;在构造函数中,FairSync和NonfairSync都继承了Syncsync=fair?新的公平同步(许可):新的非公平同步(许可);默认情况下,非公平Semaphore会调用Sync的nonfairTryAcquireShared。staticfinalclassNonfairSyncextendsSync{privatestaticfinallongserialVersionUID=-2694183684443567898L;NonfairSync(intpermits){super(permits);}protectedinttryAcquireShared(intacquires){返回nonfairred(Seremap}Acquires)公平tryAcquireShared()的内部实现。staticfinalclassFairSyncextendsSync{privatestaticfinallongserialVersionUID=2014338818796000944L;FairSync(intpermits){super(permits);}protectedinttryAcquireShared(intacquires){for(;;){if(hasQueuned()1Predecess);int可用=getState();intremaining=available-获取;if(remaining<0||compareAndSetState(available,remaining))返回剩余;}}}回过头来看acquire(),内部方法acquireSharedInterruptibly是AQS的Internalmethods。publicvoidacquire()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}如果线程中断,则直接抛出异常,如果没有获取到资源,则进入排队机制。publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();//可用资源数小于0,正在排队。这里的实现是在子类中,也就是if(tryAcquireShared(arg)<0)doAcquireSharedInterruptibly(arg);}重点看这个doAcquireSharedInterruptibly()。privatevoiddoAcquireSharedInterruptibly(intarg)throwsInterruptedException{//在加入共享模式的阻塞队列之前,我们已经提到了finalNodenode=addWaiter(Node.SHARED);//默认失败布尔值failed=true;try{for(;;){//获取前驱节点finalNodep=node.predecessor();//如果前一个节点是头节点,则重新尝试获取(原因是头节点可能会释放资源)if(p==head){intr=tryAcquireShared(arg);//如果获取到并且还有剩余资源if(r>=0){//1.设置当前节点为头节点//2.判断后续节点是否为共享等待节点//3.唤醒后续节点setHeadAndPropagate(node,r);p.next=null;//帮助GCfailed=false;返回;}}//这一步主要是检查获取资源失败的节点Status//如果线程需要阻塞则返回true//parkAndCheckInterrupt如果线程中断则抛出异常if(shouldParkAfterFailedAcquire(p,node)&&am;parkAndCheckInterrupt())抛出新的InterruptedException();}}finally{//如果失败,取消正在进行的获取if(failed)cancelAcquire(node);}}shouldParkAfterFailedAcquire()看一下细节,可能有同学不知道privatestaticbooleanshouldParkAfterFailedAcquire(Nodepred,Nodenode){intws=pred.waitStatus;//只要前节点释放了锁,就会通知标记为SIGNAL(-1)的后节点的线程//如果前节点是SIGNAL,则只需要等待其他前节点的线程要被释放,if(ws==Node.SIGNAL)returntrue;//这里的判断指的是取消状态,如果是取消,则移动节点Removeif(ws>0){do{node.prev=pred=pred.prev;}while(pred.waitStatus>0);pred.next=节点;}else{//cas更新compareAndSetWaitStatus(pred,ws,Node.SIGNAL);}返回假;}这里像SIGNAL这样的常量,大家可以自行查看源码,这也是细节。发现这段代码的主要作用是检查节点的状态,并对后续节点做一些操作。这里没有阻塞操作。让我们看看parkAndCheckInterrupt()。privatefinalbooleanparkAndCheckInterrupt(){LockSupport.park(this);返回Thread.interrupted();}这里可以看到加了锁,所以这里就发生了阻塞。那么释放锁在哪里呢?实际上,它处于发布阶段。privatevoidunparkSuccessor(Nodenode){intws=node.waitStatus;如果(ws<0)compareAndSetWaitStatus(节点,ws,0);节点s=node.next;如果(s==null||s.waitStatus>0){s=null;for(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}if(s!=null)LockSupport.unpark(s.thread);}可以看到在unparkSuccessor中释放了锁,这个过程发生在释放阶段。release()比较简单,可以自己看源码,实现有点类似。结语其实这一节带大家看源码,主要是给大家讲讲共享锁的知识。信号量实际上使用共享锁。另外,AQS类值得仔细研究。你会发现很多有用的类都是基于它实现的。我们之前讲源码的时候也遇到过。如果你有兴趣,你可以了解一下。

最新推荐
猜你喜欢