当前位置: 首页 > 科技观察

基于Consul的分布式信号量实现

时间:2023-03-15 23:22:20 科技观察

本文将继续讨论基于Consul的分布式锁实现。信号量是我们在实现并发控制时经常使用的一种方法。主要用于限制线程或进程的并发数。比如Zuul默认使用信号量来限制每条路由的并发数,实现不同的路由。之间的资源隔离。信号量(Semaphore),有时也称为信号量,是一种在多线程环境中使用的设施,可用于确保两个或多个关键代码段不会被并发调用。在进入代码的临界区之前,线程必须获得一个信号量;一旦代码的关键部分完成,线程必须释放信号量。其他想要进入关键代码段的线程必须等到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量VI,然后将AcquireSemaphoreVI和ReleaseSemaphoreVI放置在每个关键代码段的开头和结尾,并确认这些信号量VI引用了最初创建的信号量。比如在这个停车场系统中,车位是公共资源,每辆车就像一个线程,看门人充当信号量。实现idea信号量存储:semaphore/key获取操作:创建sessionlockkey竞争对手:semaphore/key/session查询信号量:semaphore/key/.lock,可以得到如下内容(如果是第一次创建信号量,不会获取到,所以此时直接创建){"limit":3,"holders":["90c0772a-4bd3-3a3c-8215-3b8937e36027","93e5611d-5365-a374-8190-f80c4a7280ab"]}如果holder已经达到上限,返回false,如果是阻塞模式,继续尝试获取的操作如果holder没有达到上限,更新semaphore/key/.lock的内容,并将当前线程的sessionId添加到holders中。注意:更新时需要设置ca。它的值是“查询信号量”步骤中得到的“ModifyIndex”值。该值用于确保更新操作的基础不被其他竞争者更新。如果更新成功,开始执行具体逻辑。如果更新不成功,说明其他竞争者抢占了资源,返回false,继续尝试阻塞方式获取的操作释放操作:从semaphore/key/.lock的持有者中移除当前sessionId删除信号量/key/session删除当前Session流程图代码实现/****@paramconsulClientconsul客户端实例*@paramlimitsemaphore上限值*@paramkeyPathconsul中存储的信号量参数路径*/publicSemaphore(ConsulClientconsulClient,intlimit,StringkeyPath){this.consulClient=consulClient;this.limit=limit;this.keyPath=prefix+keyPath;}/***获取的信号量**@paramblock被阻塞。如果为真,则继续尝试直到获得资源。*@return*@throwsIOException*/publicBooleanacquired(booleanblock)throwsIOException{if(acquired){logger.error(sessionId+"-Alreadyacquired");thrownewRuntimeException(sessionId+"-Alreadyacquired");}//createsessionclearSession();this.sessionId=createSessionId("semaphore");logger.debug("Createsession:"+sessionId);//addcontenderentryStringcontenderKey=keyPath+"/"+sessionId;logger.debug("contenderKey:"+contenderKey);PutParamsputParams=newPutParams();putParams.setAcquireSession(sessionId);Booleanb=consulClient.setKVValue(contenderKey,"",putParams).getValue();if(!b){logger.error("Failedtoaddcontenderentry:"+contenderKey+","+sessionId);thrownewRuntimeException("Failedtoaddcontenderentry:"+contenderKey+","+sessionId);}while(true){//trytotakethesemaphoreStringlockKey=keyPath+"/.lock";StringlockKeyValue;GetValuelockKeyContent=consulClient.getKVValue(lockKey).getValue();if(lockKeyContent!=null){//锁值转换lockKeyValue=lockKeyContent.getValue();BASE64Decoderdecoder=newBASE64Decoder();byte[]v=decoder.decodeBuffer(lockKeyValue);StringlockKeyValueDecode=newString(v);logger.debug("lockKey="+lockKey+",lockKeyValueDecode="+lockKeyValueDecode);Gsongson=newGson();ContenderValuecontenderValue=gson.fromJson(lockKeyValueDecode,ContenderValue.class);//当前信号量已满if(contenderValue.getLimit()==contenderValue.getHolders().size()){logger.debug("Semaphorelimited"+contenderValue.getLimit()+",waiting...");if(block){//如果是阻塞模式,再试try{Thread.sleep(100L);}catch(InterruptedExceptione){}continue;}//非阻塞模式,没有获取到信号量直接返回returnfalse;}//信号量增加contenderValue.getHolders().add(sessionId);putParams=newPutParams();putParams.setCas(lockKeyContent.getModifyIndex());booleanc=consulClient.setKVValue(lockKey,contenderValue.toString(),putParams)。getValue();if(c){acquired=true;returntrue;}elsecontinue;}else{//当前信号量还不存在,所以创建一个并立即抢占资源();putParams.setCas(0L);booleanc=consulClient.setKVValue(lockKey,contenderValue.toString(),putParams).getValue();if(c){acquired=true;returntrue;}continue;}}}/***创建sessionId*@paramsessionName*@return*/publicStringcreateSessionId(StringsessionName){NewSessionnewnewSession=newNewSession();newSession.setName(sessionName);returnconsulClient.sessionCreate(newSession,null).getValue();}/***发布session,并从锁中移除当前sessionId*@throwsIOException*/publicvoidrelease()throwsIOException{if(this.acquired){//removesessionfromlockwhile(true){StringcontenderKey=keyPath+"/"+sessionId;StringlockKey=keyPath+"/.lock";StringlockKeyValue;GetValuelockKeyContent=consulClient.getKVValue(lockKey).getValue();if(lockKeyContent!=null){//锁值转换lockKeyValue=lockKeyContent.getValue();BASE64Decoderdecoder=newBASE64Decoder();byte[]v=decoder.decodeBuffer(lockKeyValue);StringlockKeyValueDecode=newString(v);Gsongson=newGson();ContenderValuecontenderValue=gson.fromJson(lockKeyValueDecode,ContenderValue.class);contenderValue.getHolders().remove(sessionId);PutParamsputParams=newPutParams();putParams.setCas(lockKeyContent.getModifyIndex());consulClient.deleteKVValue(contenderKey);booleanc=consulClient.setKVValue(lockKey,contenderValue.toString(),putParams).getValue();if(c){break;}}}//removesessionkey}this.acquired=false;clearSession();}publicvoidclearSession(){if(sessionId!=null){consulClient.sessionDestroy(sessionId,null);sessionId=null;}}classContenderValueimplementsSerializable{privateIntegerlimit;privateListholders=newArrayList<>();publicIntegergetLimit(){returnlimit;}publicvoidsetLimit(Integerlimit){this.limit=limit;}publicListgetHolders(){returnholders;}publicvoidsetHolders(列表holders){this.holders=holders;}@OverridepublicStringtoString(){returnnewGson().toJson(this);}}}单元测试下面单元测试的逻辑:通过线程模拟不同的分布式服务获取信号量执行业务逻辑Since信号量不同于简单的分布式互斥量,它不仅仅限制一个线程进行操作,而是可以控制多个线程的并发,所以通过下面的单元测试,我们将信号量设置为3,然后启动15个线程进行竞争同时观察分布式信号量执行的结果。INFO[Thread-6]SemaphoreRunner-Thread7start!INFO[Thread-2]SemaphoreRunner-Thread3start!INFO[Thread-7]SemaphoreRunner-Thread8start!INFO[Thread-2]SemaphoreRunner-Thread3end!INFO[Thread-5]SemaphoreRunner-Thread6start!INFO[Thread-6]SemaphoreRunner-Thread7end!INFO[Thread-9]SemaphoreRunner-Thread10start!INFO[Thread-5]SemaphoreRunner-Thread6end!INFO[Thread-1]SemaphoreRunner-Thread2start!INFO[Thread-7]SemaphoreRunner-Thread8end!INFO[Thread-10]SemaphoreRunner-Thread11start!INFO[Thread-10]SemaphoreRunner-Thread11end!INFO[Thread-12]SemaphoreRunner-Thread13start!INFO[Thread-1]SemaphoreRunner-Thread2end!INFO[Thread-3]SemaphoreRunner-Thread4start!INFO[Thread-9]SemaphoreRunner-Thread10end!INFO[Thread-0]SemaphoreRunner-Thread1start!INFO[Thread-3]SemaphoreRunner-Thread4end!INFO[Thread-14]SemaphoreRunner-Thread15start!INFO[Thread-12]SemaphoreRunner-Thread13end!INFO[Thread-0]SemaphoreRunner-Thread1end!INFO[Thread-13]SemaphoreRunner-Thread14start!INFO[Thread-11]SemaphoreRunner-Thread12start!INFO[Thread-13]SemaphoreRunner-Thread14end!INFO[Thread-4]SemaphoreRunner-Thread5start!INFO[Thread-4]SemaphoreRunner-Thread5end!INFO[Thread-8]SemaphoreRunner-Thread9start!INFO[Thread-11]SemaphoreRunner-Thread12end!INFO[Thread-14]SemaphoreRunner-Thread15end!INFO[Thread-8]SemaphoreRunner-Thread9end!publicclassTestLock{privateLoggerlogger=Logger.getLogger(getClass());@TestpublicvoidtestSemaphore()throwsException{新线程(newSemaphoreRunner(1)).start();newThread(newSemaphoreRunner(2)).start();newThread(newSemaphoreRunner(3)).start();newThread(newSemaphoreRunner(4)).start();newThread(newSemaphoreRunner(5)).开始();newThread(newSemaphoreRunner(6)).start();newThread(newSemaphoreRunner(7)).start();newThread(newSemaphoreRunner(8)).start();newThread(newSemaphoreRunner(9)).start();newThread(newSemaphoreRunner(10)).start();Thread.sleep(1000000L);}}publicclassSemaphoreRunnerimplementsRunnable{privateLoggerlogger=Logger.getLogger(getClass());privateintflag;publicSemaphoreRunner(intflag){this.flag=flag;}@Overridepublicvoidrun(){Semaphoresemaphore=newSemaphore(newConsulClient(),3,"mg-init");try{if(semaphore.acquired(真)){//获取信号量并执行业务逻辑logger.info("Thread"+flag+"start!");Thread.sleep(newRandom().nextInt(10000));logger.info("Thread"+flag+"end!");}}catch(Exceptione){e.printStackTrace();}finally{try{//Semaphorerelease,Sessionlockrelease,Sessiondeletesemaphore.release();}catch(IOExceptione){e.printStackTrace();}}}}从测试结果我们可以发现,当信号量持有者数量达到信号量上限3时,其他竞争者开始等待,只有当一个持有者释放信号量后,新的线程才会成为持有者并开始执行自己的业务逻辑。因此,分布式信号量可以帮助我们有效地控制对共享资源的并发操作数。优化建议同上一个,这里只是简单实现一下。在线应用程序还必须添加TTL会话清理和清理.lock资源中无效持有者的机制。参考文档:https://www.consul.io/docs/guides/semaphore.html实现代码GitHub:https://github.com/dyc87112/consul-distributed-lock开源中国:http://git.oschina.net/didispace/consul-distributed-lock【本文为专栏作者“翟永超”原创稿件,转载请联系作者获得授权】点此查看该作者更多好文