当前位置: 首页 > 科技观察

Java高并发编程基础三大利器Semaphore

时间:2023-03-13 20:38:58 科技观察

介绍。近期可以报税。还没有申报的同学可以赶快尝试一下。但是反正我从早上到下午(3月1日)都没有申报成功,一申报就返回“访客过多,请稍后再试”。为什么有的人能够宣告成功,有的人直接返回失败。很明显,报关处理资源是有限的,别人处理完了你才能处理你的。运气好的话,多试几次就轮到你了。如果你运气不好,你可能会重试一天,也可能轮不到你。你。反正我已经放弃了,会在夜深人静的时候再试一次。作为程序员的我们要知道,这是一个报税APP的限流操作。如果你还不明白什么是限流操作,可以参考这篇文章《高并发系统三大利器之限流》。比如个人报税系统每台机器最多只能处理1000条请求,再多就会挂机。如果是冗余请求,则拒绝这些请求。直接给你回个温馨提示:“当前访问量太大,请稍后再试”,如果你想实现这个功能,请想想可以用哪些方法和算法来实现。共享锁、排它锁在学习信号量之前,我们首先要了解什么是共享锁。共享锁:允许多个线程同时获取锁,并发访问共享资源。独占锁:有人称之为“独占锁”,独占独占,只能由一个线程持有。当独占锁已经被某个线程持有时,其他线程只能等待释放后才能竞争锁,同时只有一个线程能够成功竞争锁。什么是SemaphoreSemaphore(信号量)用于控制同时访问特定资源的线程数。它协调每个线程以确保公共资源的合理使用。多年来,我发现很难从字面上理解Semaphore的含义。我只能把它比作控制交通的红绿灯。比如XX路要限行,这条路上同时只允许一百辆车。执行,其他人必须在路口等,所以前一百辆车会看到绿灯,可以开进这条路,后面的车会看到红灯,不能进入XX路,但是如果前一百辆车有五汽车已经离开了XX路,所以允许5辆车进入这条路。在这个例子中,汽车是一个线程。当进入道路时,表示线程正在执行。当离开道路时,表示线程执行完毕。看到红灯表示线程被阻塞,无法执行。"Semaphore机制是为线程提供抢占权限,所以可以实现公平或不公平,类似于ReentrantLock。说了这么多,我们来看一个实际的例子。比如我们去停车场停车。停车场只有5个车位,但现在有8辆车可以停,剩下的3辆车要么等其他车开走。然后停车,还是另找停车位?/***@author:公众号【Java金融】*/publicclassSemaphoreTest{publicstaticvoidmain(String[]args)throwsInterruptedException{//初始化5个车位Semaphoresemaphore=newSemaphore(5);//等待所有车finalCountDownLatchlatch=newCountDownLatch(8));for(inti=0;i<8;i++){intfinalI=i;if(i==5){Thread.sleep(1000);newThread(()->{stopCarNotWait(semaphore,finalI);闩锁。countDown();}).start();continue;}newThread(()->{stopCarWait(semaphore,finalI);latch.countDown();}).start();}latch.await();log("总剩余:"+semaphore.availablePermits()+"车位");}privatestaticvoidstopCarWait(Semaphoresemaphore,intfinalI){Stringformat=String.format("车牌号%d",finalI);try{semaphore.acquire(1);log(format+"我找到了一个停车位,然后去停车");Thread.sleep(10000);}catch(Exceptione){e.printStackTrace();}finally{semaphore.release(1);log(format+"开走”);}}privatestaticvoidstopCarNotWait(Semaphoresemaphore,intfinalI){Stringformat=String.format("车牌号%d",finalI);try{if(semaphore.tryAcquire()){log(format+"我找到了停车位,去停车了");Thread.sleep(10000);log(format+"开走");semaphore.release();}else{log(format+"没有停车位,我就不在这里等了,去别处停车");}}catch(Exceptione){e.printStackTrace();}}publicstaticvoidlog(Stringcontent){//FormatDateTimeFormatterfmTime=DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss");//当前时间LocalDateTimenow=LocalDateTime.now();System.out.println(now.format(fmTime)+""+content);}}2021-03-0118:54:570号车牌找到车位,去公园2021-03-0118:54:573号车牌找到车位2021-03-0118:54:57车牌号2找到车位,去公园2021-03-01118:54:57车牌号1找到车位,去公园2021-03-0118:54:57车牌号4找到车位去停车2021-03-011118:54:58车牌号5没有车位,停在这里等,去别处停车2021-03-01118:55:07车牌号7找到停车位,去公园2021-03-01118:55:07车牌号6找到停车位,去公园2021-03-01118:55:07车牌号2开走了2021-03-0118:55:07车牌号0开走了2021-03-0118:55:073号车牌开走2021-03-0118:55:074号车牌开走2021-03-0118:55:071号车牌开走2021-03-01118:55:177号车开走了2021-03-01118:55:17车牌号6开走了2021-03-0118:55:17剩余总数:5个停车位。从输出结果可以看出,5号车牌,看到这辆车没有车位,就会停止在这个地方等待,去其他地方,但是6号车牌和7号车牌需要等到车库开两辆车空出两个车位后才能停车。这说明Semaphore的acquire方法如果没有获取到credential就会阻塞,如果tryAcquire方法没有获取到Credentials则不会阻塞。信号量在dubbo中的应用在dubbo中,可以为Provider配置线程池大小,以控制系统提供服务的最大并行度。默认是200。比如我现在的订单系统有三个接口,分别是创建订单,取消订单,修改订单。这三个接口的并发数加起来是200,但是创建订单接口是核心接口。我想让它用更多的线程执行,这样它最多可以有150个线程。取消订单和修改订单分别最多可以使用25个线程执行。.dubbo提供了executes属性来实现这个功能我们可以看到dubbo内部是如何执行的。具体实现在ExecuteLimitFilter类中。我们可以publicResultinvoke(Invokerinvoker,Invocationinvocation)throwsRpcException{URLurl=invoker.getUrl();StringmethodName=invocation.getMethodName();SemaphoreexecutesLimit=null;booleanacquireResult=false;intmax=url.getMethodParameter(methodName,Constants.EXECUTES_KEY,0);if(max>0){RpcStatuscount=RpcStatus.getStatus(url,invocation.getMethodName());//如果当前使用的线程数大于等于设置的阈值,则直接抛出异常//if(count.getActive()>=max){//thrownewRpcException("Failedtoinvokemethod"+invocation.getMethodName()+"inprovider"+url+",cause:Theservice//usingthreadsgreaterthanlimited.");/***http://manzhizhen.iteye.com/blog/2386408*usesemaphoreforconcurrencycontrol(tolimitthreadnumber)*/executesLimit=count.getSemaphore(max);if(executesLimit!=null&&!(acquireResult=executesLimit.tryAcquire())){thrownewRpcException("Failedtoinvokemethod"+invocation.getMethodName()+"inprovider"+url+",cause:Theserviceusingthreadsgreaterthanlimited.");}}longbegin=System.currentTimeMillis();booleanisSuccess=true;//计算器+1RpcStatus.beginCount(url,methodName);try{Resultresult=invoker.invoke(invocation);returnresult;}catch(Throwablet){isSuccess=false;if(tinstanceofRuntimeException){throw(RuntimeException)t;}else{thrownewRpcException("unexpectedexceptionwhenExecuteLimitFilter",t);}}finally{//计算器-1RpcStatus.endCount(url,methodName,System.currentTimeMillis()-begin,isSuccess);if(acquireResult){executesLimit.release();}}}从上面的代码我们也可以看出,这个在前期并没有使用Semaphore实现,而是直接使用注解的if(count.getActive()>=max)来实现,因为这个count.getActive()>=max和这个count加1不是原子的,所以会出问题,具体bug号可以在https://github.com/apache/dubbo在/pull/582之后,上面的代码用于修复Semaphore的非原子问题。更详细的分析请参考代码链接。但是现在最新的版本(2.7.9)我觉得是用spinplus和CAS实现的。上面的Semaphore是Semaphore的简单使用,也是dubbo中使用的例子。说实话,Semaphore在工作中用的还是比较少的,但是面试的时候可能会问到,所以还是有必要一起学习一下的。.我们之前是通过ReentrantLock了解了AQS的?。其实Semaphore也是通过AQS实现的。我们可以把排他锁的方法放在一起比较。基本上都有对应的方法。图中这里有两点需要注意:在独占锁模式下,我们只有在获取独占锁的节点释放锁时才会唤醒后继节点,因为独占锁只能由一个线程持有,如果它仍然没有被释放,则不需要唤醒它的后继节点。在共享锁模式下,当一个节点获取到共享锁后,我们可以在获取成功后唤醒后继节点,而不用等待该节点释放锁,因为共享锁可以同时被多个线程持有time,当获取到锁后,所有后续节点都可以直接获取。因此,在共享锁模式下,当获取锁和释放锁时,会唤醒后继节点。获取凭据,我们也是采用非公平锁方式获取凭据。我们可以看一下acquire的核心方法publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();if(tryAcquireShared(arg)<0)doAcquireSharedInterruptibly(arg);}protectedinttryAcquireShared(intacquires){returnnonfairTryAcquireShared(acquires);}//主要看这个方法,这个方法返回的值是tryAcquireShared返回的值,因为tryAcquireShared->nonfairTryAcquireSharedfinalintnonfairTryAcquireShared(intacquires){//Spinfor(;;){//Semaphore使用了AQS的状态变量的值表示可用的许可数intavailable=getState();//可用的许可数减去本次需要获取的许可数就是剩余的许可数intremaining=available-acquires;//如果剩余许可数小于0或者CAS成功设置当前可用许可数为剩余许可数,则返回成功许可数if(remaining<0||compareAndSetState(available,remaining))returnremaining;}当tryAcquireShared获取返回的license小于0时,表示获取license失败,需要进入doAcquireSharedInterruptibly方法休眠。当tryAcquireShared返回的license小于0时,表示获取license成功,直接结束。doAcquireSharedInterruptiblyprivatevoiddoAcquireSharedInterruptibly(intarg)throwsInterruptedException{//独占锁的acquireQueued调用addWaiter(Node.EXCLUSIVE),//共享锁调用addWaiter(Node.SHARED),表示节点处于共享模式finalNodenode=addWaiter(Node.SHARED);booleanfailed=true;try{for(;;){finalNodep=node.predecessor();if(p==head){intr=tryAcquireShared(arg);if(r>=0){setHeadAndPropagate(node,r);p.next=null;//helpGCfailed=false;return;}}if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())thrownewInterruptedException();}}finally{if(failed)cancelAcquire(node);}}这个方法是不是和上一篇我们讲的AQS独占锁的acquireQueued类似,只是独占锁直接调用setHead(node)方法,而共享锁除了调用setHeadAndPropagate(node,r)方法外setHead,它还会调用doReleaseShared(唤醒后继节点)privatevoidsetHeadAndPropagate(Nodenode,intpropagate){Nodeh=head;//RecordoldheadforcheckbelowsetHead(node);if(propagate>0||h==null||h.waitStatus<0||(h=head)==null||h.waitStatus<0){Nodes=node.next;if(s==null||s.isShared())doReleaseShared();}}其他方法与ReentrantLock实现的独占锁基本相同。相信对源码分析感兴趣的人不多。更详细的还是需要自己看源码的总结作为semaphore当Semaphore初始化license为1时,也可以作为mutex使用。其中0和1就相当于它的状态。当=1时,表示其他线程可以获取到。=0时为独占,即其他线程必须等待。Semaphore是JUC包中一个非常简单的工具类。用于限制多线程下同时访问资源的线程数。Semaphore中有一个【权限】的概念,即在访问资源之前,必须先获得权限。如果当前许可数为0,则线程阻塞,直到获得许可。Semaphore内部使用AQS,抽象内部类Sync继承AQS。因为Semaphore本质上是一个共享场景,所以它的内部其实和共享锁类似。实现共享锁的调用框架与独占锁非常相似。它们最大的区别在于获取锁的逻辑——共享锁可以同时被多个线程持有。是的,独占锁一次只能由一个线程持有。由于共享锁可以同时被多个线程持有,当头节点获得共享锁后,可以立即唤醒后继节点去竞争锁,而不用等到锁被释放。因此,共享锁触发后继节点唤醒的行为可能有两种。一种是当前节点成功获取共享锁后,另一种是当前节点释放共享锁后。如果使用信号量进行限流,则会出现尖峰。★表示在一定时间内所有资源在短时间内用完,大部分时间没有可用资源。比如在限流方式的计算器算法中,1秒内的最大请求数设置为100,如果前100ms已经有100个请求,那么接下来的900ms将不再处理这些请求。这就是尖峰现象。本文转载自微信公众号“java财经”,可通过以下二维码关注。转载本文请联系爪哇财经公众号。