本文将继续讨论基于Consul的分布式锁实现。信号量是我们在实现并发控制时经常使用的一种方法。主要用于限制线程或进程的并发数。比如Zuul默认使用信号量来限制每条路由的并发数,实现不同的路由。之间的资源隔离。信号量(Semaphore),有时也称为信号量,是一种在多线程环境中使用的设施,可用于确保两个或多个关键代码段不会被并发调用。在进入代码的临界区之前,线程必须获得一个信号量;一旦代码的关键部分完成,线程必须释放信号量。其他想要进入关键代码段的线程必须等到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量VI,然后将AcquireSemaphoreVI和ReleaseSemaphoreVI放置在每个关键代码段的开头和结尾,并确认这些信号量VI引用了最初创建的信号量。比如在这个停车场系统中,车位是公共资源,每辆车就像一个线程,看门人充当信号量。实现idea信号量存储:semaphore/key获取操作:创建sessionlockkey竞争对手:semaphore/key/session查询信号量:semaphore/key/.lock,可以得到如下内容(如果是第一次创建信号量,不会获取到,所以此时直接创建){"limit":3,"holders":["90c0772a-4bd3-3a3c-8215-3b8937e36027","93e5611d-5365-a374-8190-f80c4a7280ab"]}如果holder已经达到上限,返回false,如果是阻塞模式,继续尝试获取的操作如果holder没有达到上限,更新semaphore/key/.lock的内容,并将当前线程的sessionId添加到holders中。注意:更新时需要设置ca。它的值是“查询信号量”步骤中得到的“ModifyIndex”值。该值用于确保更新操作的基础不被其他竞争者更新。如果更新成功,开始执行具体逻辑。如果更新不成功,说明其他竞争者抢占了资源,返回false,继续尝试阻塞方式获取的操作释放操作:从semaphore/key/.lock的持有者中移除当前sessionId删除信号量/key/session删除当前Session流程图代码实现/****@paramconsulClientconsul客户端实例*@paramlimitsemaphore上限值*@paramkeyPathconsul中存储的信号量参数路径*/publicSemaphore(ConsulClientconsulClient,intlimit,StringkeyPath){this.consulClient=consulClient;this.limit=limit;this.keyPath=prefix+keyPath;}/***获取的信号量**@paramblock被阻塞。如果为真,则继续尝试直到获得资源。*@return*@throwsIOException*/publicBooleanacquired(booleanblock)throwsIOException{if(acquired){logger.error(sessionId+"-Alreadyacquired");thrownewRuntimeException(sessionId+"-Alreadyacquired");}//createsessionclearSession();this.sessionId=createSessionId("semaphore");logger.debug("Createsession:"+sessionId);//addcontenderentryStringcontenderKey=keyPath+"/"+sessionId;logger.debug("contenderKey:"+contenderKey);PutParamsputParams=newPutParams();putParams.setAcquireSession(sessionId);Booleanb=consulClient.setKVValue(contenderKey,"",putParams).getValue();if(!b){logger.error("Failedtoaddcontenderentry:"+contenderKey+","+sessionId);thrownewRuntimeException("Failedtoaddcontenderentry:"+contenderKey+","+sessionId);}while(true){//trytotakethesemaphoreStringlockKey=keyPath+"/.lock";StringlockKeyValue;GetValuelockKeyContent=consulClient.getKVValue(lockKey).getValue();if(lockKeyContent!=null){//锁值转换lockKeyValue=lockKeyContent.getValue();BASE64Decoderdecoder=newBASE64Decoder();byte[]v=decoder.decodeBuffer(lockKeyValue);StringlockKeyValueDecode=newString(v);logger.debug("lockKey="+lockKey+",lockKeyValueDecode="+lockKeyValueDecode);Gsongson=newGson();ContenderValuecontenderValue=gson.fromJson(lockKeyValueDecode,ContenderValue.class);//当前信号量已满if(contenderValue.getLimit()==contenderValue.getHolders().size()){logger.debug("Semaphorelimited"+contenderValue.getLimit()+",waiting...");if(block){//如果是阻塞模式,再试try{Thread.sleep(100L);}catch(InterruptedExceptione){}continue;}//非阻塞模式,没有获取到信号量直接返回returnfalse;}//信号量增加contenderValue.getHolders().add(sessionId);putParams=newPutParams();putParams.setCas(lockKeyContent.getModifyIndex());booleanc=consulClient.setKVValue(lockKey,contenderValue.toString(),putParams)。getValue();if(c){acquired=true;returntrue;}elsecontinue;}else{//当前信号量还不存在,所以创建一个并立即抢占资源();putParams.setCas(0L);booleanc=consulClient.setKVValue(lockKey,contenderValue.toString(),putParams).getValue();if(c){acquired=true;returntrue;}continue;}}}/***创建sessionId*@paramsessionName*@return*/publicStringcreateSessionId(StringsessionName){NewSessionnewnewSession=newNewSession();newSession.setName(sessionName);returnconsulClient.sessionCreate(newSession,null).getValue();}/***发布session,并从锁中移除当前sessionId*@throwsIOException*/publicvoidrelease()throwsIOException{if(this.acquired){//removesessionfromlockwhile(true){StringcontenderKey=keyPath+"/"+sessionId;StringlockKey=keyPath+"/.lock";StringlockKeyValue;GetValuelockKeyContent=consulClient.getKVValue(lockKey).getValue();if(lockKeyContent!=null){//锁值转换lockKeyValue=lockKeyContent.getValue();BASE64Decoderdecoder=newBASE64Decoder();byte[]v=decoder.decodeBuffer(lockKeyValue);StringlockKeyValueDecode=newString(v);Gsongson=newGson();ContenderValuecontenderValue=gson.fromJson(lockKeyValueDecode,ContenderValue.class);contenderValue.getHolders().remove(sessionId);PutParamsputParams=newPutParams();putParams.setCas(lockKeyContent.getModifyIndex());consulClient.deleteKVValue(contenderKey);booleanc=consulClient.setKVValue(lockKey,contenderValue.toString(),putParams).getValue();if(c){break;}}}//removesessionkey}this.acquired=false;clearSession();}publicvoidclearSession(){if(sessionId!=null){consulClient.sessionDestroy(sessionId,null);sessionId=null;}}classContenderValueimplementsSerializable{privateIntegerlimit;privateList
