1.前言Curator是一个用Java编写的操作Zookeeper的客户端工具,里面封装了分布式锁、选举等高级功能。今天主要分析实现分布式锁的主要原理。关于分布式锁的一些介绍或者其他实现,有兴趣的同学可以阅读以下文章:本人花费数万字,走过风风雨雨的Redis实现分布式锁之路,从单机到主从再到多机instance,就会出现这么多问题_阳阳的博客-CSDN博客在使用锁的时候,Curator会在指定路径下创建一个有序的临时节点。如果该节点是最小的,则说明获取锁成功。接下来,在准备阶段,我们可以观察是否会创建一个临时节点。2、准备工作首先,我们需要搭建一个zookeeper集群。当然你也可以单机使用。在这篇采访者:你能给我画一张Zookeeper选举的图吗?,介绍了一种使用docker-compose快速搭建zk集群的方法。在pom中引入依赖:org.apache.curatorcurator-recipes2.12.0Curator客户端配置项:/***@authorqcy*@create2022/01/0122:59:34*/@ConfigurationpublicclassCuratorFrameworkConfig{//zk各节点地址privatestaticfinalStringCONNECT_STRING="localhost:2181,localhost:2182,localhost:2183";//连接超时时间(单位:millisecond)privatestaticfinalintCONNECTION_TIME_OUT_MS=10*1000;//会话超时时间(单位:毫秒)privatestaticfinalintSESSION_TIME_OUT_MS=30*1000;//初始等待重试时间(单位:毫秒)privatestaticfinalintBASE_SLEEP_TIME_MS=2*1000;//最大重试次数ESTRIstaticfinalfinalMAX_REVATE3;@BeanpublicCuratorFrameworkgetCuratorFramework(){CuratorFrameworkcuratorFramework=CuratorFrameworkFactory.builder().connectString(CONNECT_STRING).connectionTimeoutMs(CONNECTION_TIME_OUT_MS).sessionTimeoutMs(SESSION_TIME_OUT_MS).retryPolicy(newExponentialBackoffRetry(BASE_SLEEP_TIME_MS,MAX_RETRIES)).build();curatorFramework。的tart();returncuratorFramework;}}SESSION_TIME_OUT_MS参数将保证当客户端获取锁后突然崩溃时,zk可以在这段时间内删除当前客户端创建的临时有序节点。测试代码如下://临时节点路径,qcy是博主的缩写;try{/read.simulationsleep(30*1000);}catch(Exceptione){e.printStackTrace();}finally{interProcessMutex.release();}}使用接口调用该方法时,设置断点在Thread.sleep并进入zk观察容器中创建的节点。使用dockerexec-itzk容器名/bin/bash以交互方式进入容器,然后使用./bin/zkCli.sh连接zk服务器。然后使用ls路径查看节点。这三个节点是持久节点。可以使用getpath查看节点的数据结构信息。如果一个节点的ephemeralOwner值为0,即该节点的临时所有者的sessionid为0,则说明该节点是持久节点。当我们到达断点Thread.sleep时,确实发现在lockqcy下创建了一个临时节点。至此,准备工作已经完成。接下来分析interProcessMutex.acquire和release的过程。3.源码分析Curator支持多种类型的Lock,比如InterProcessMutex,可重入锁和独占锁InterProcessReadWriteLock,读写锁InterProcessSemaphoreMutex,不可重入独占锁今天主要是分析InterProcessMutex的添加和解锁的过程,先看看lockingprocessandlockpublicvoidacquire()throwsException{if(!internalLock(-1,null)){thrownewIOException("Lostconnectionwhiletryingtoacquirelock:"+basePath);}}这里是阻塞获取锁。如果无法获取到锁,它将一直阻塞。所以对于internalLock方法,超时设置为-1,时间单位设置为null。privatebooleaninternalLock(longtime,TimeUnitunit)throwsException{ThreadcurrentThread=Thread.currentThread();//通过map中是否可以获取线程的LockData信息来判断线程是否已经持有锁LockDatalockData=threadData.get(currentThread);if(lockData!=null){//可重入,直接返回加锁成功lockData.lockCount.incrementAndGet();returntrue;}//加锁StringlockPath=internals.attemptLock(time,unit,getLockNodeBytes());if(lockPath!=null){//加锁成功,保存到map中绑定到该线程的数据。锁定线程owningThread、重入计数lockCount和锁定路径lockPath保存在LockData中,例如/lockqcy/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005privatefinalConcurrentMapthreadData=Maps.newConcurrentMap();privatestaticclassLock{finalThreadowningThread;finalStringlockPath;finalAtomicIntegerlockCount=newAtomicInteger(1);privateLockData(ThreadowningThread,StringlockPath){this.owningThread=owningThread;this.lockPath=lockPath;}}进入internals.attemptLock方法StringattemptLock(longtime,TimeUnitunit,byte[]lockNodeBytes)throwsException{//开始时间finallongstartMillis=System.currentTimeMillis();//将超时时间统一转换为毫秒finalLongmillisToWait=(unit!=null)?unit.toMillis(time):null;//节点数据,这里是nullfinalbyte[]localLockNodeBytes=(revocable.get()!=null)?newbyte[0]:lockNodeBytes;//重试次数intretryCount=0;//锁路径StringourPath=null;//是否获取锁booleanhasTheLock=false;//是否完成booleanisDone=false;while(!isDone){isDone=true;try{//创建临时有序节点并返回节点路径//内部调用client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);ourPath=driver.createsTheLock(client,path,localLockNodeBytes);//根据返回的节点路径,判断是否已经抢到锁hasTheLock=internalLockLoop(startMillis,millisToWait,ourPath);}catch(KeeperException.NoNodeExceptione){//当session过期时,driver可能找不到临时排序的节点,从而抛出NoNodeException//Retryhereif(client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,System.currentTimeMillis()-startMillis,RetryLoop.getDefaultRetrySleeper())){isDone=false;}else{throwe;}}}//获取锁,然后返回节点路径为调用者记录在map中if(hasTheLock){returnourPath;}returnull;},它会使用internalLockLoop中刚刚创建的临时有序节点来判断是否已经获取到锁privatebooleaninternalLockLoop(longstartMillis,LongmillisToWait,StringourPath)throwsException{//是否获取锁booleanhaveTheLock=false;booleandoDelete=false;try{if(revocable.get()!=null){//目前不会进入这里client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}//一直尝试获取锁while((client.getState()==CuratorFrameworkState.STARTED)&&!haveTheLock){//返回basePath下的所有临时(这里islockqcy)有序节点,按照后缀从小到大排列Listchildren=getSortedChildren();//取出当前线程创建的临时有序节点名称,这里是/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005StringsequenceNodeName=ourPath.substring(basePath.length()+1);//判断当前节点排序后是否排在第一位,如果排在第一位则说明已获取锁PredicateResultspredicateResults=driver.getsTheLock(client,children,sequenceNodeName,maxLeases);if(predicateResults.getsTheLock()){//获取到锁后,终止循环haveTheLock=true;}else{//这表示没有获取到锁//获取索引为的前一个节点小于当前节点StringpreviousSequencePath=basePath+"/"+predicateResults.getPathToWatch();synchronized(this){try{//如果前面的节点不存在,直接抛出NoNodeException,catch不处理,下一轮继续获取锁//如果前一个节点存在,为其设置监听器,监听其释放事件client.getData().usingWatcher(watcher).forPath(previousSequencePath);if(millisToWait!=null){millisToWait-=(System.currentTimeMillis()-startMillis);startMillis=System.currentTimeMillis();//判断是否超时if(millisToWait<=0){//获取锁超时,删除刚才创建的临时有序节点doDelete=true;break;}//如果没有超时,则在millisToWait等待检测到被删除触发唤醒操作wait();}}catch(KeeperException.NoNodeExceptione){//如果之前的节点不存在,直接抛出NoNodeException,在catch中不处理,继续获取锁下一轮}}}}}catch(Exceptione){ThreadUtils.checkInterrupted(e);doDelete=true;throwe;}finally{if(doDelete){//删除刚刚创建的临时有序节点deleteOurPath(ourPath);}}returnhaveTheLock;}判断是否获取锁的核心逻辑位于getsTheLockpublicPredicateResultsgetsTheLock(CuratorFrameworkclient,Listchildren,StringsequenceNodeName,intmaxLeases)throwsException{//获取所有子节点排序后当前节点的索引位置ourIndex=children.indexOf(sequenceNodeName);//判断当前节点是否在子节点中validateOurIndex(sequenceNodeName,ourIndex);//进程间Mutex的构造方法会将maxLeases初始化为1//ourIndex必须为0才能使getsTheLock为真,即当前节点必须是basePath下的最小节点才代表获得锁booleangetsTheLock=ourIndex0){return;}if(newLockCount<0){thrownewIllegalMonitorStateException("Lockcounthasgonenegativeforlock:"+basePath);}//这里表示重入次数为0try{//释放锁revocable.set(null);//内部使用保证,会在后台不断尝试删除节点deleteOurPath(lockPath);}如果重入次数大于0,则减少重入次数。当减为0时,调用zk删除节点,与Redisson的可重入锁释放一致。4.羊群效应这里说说在分布式锁场景下使用Zookeeper实现羊群效应。什么是羊群效应?帮助主人控制羊群。某个时候,当其中一只羊看到前方有更好吃的草而移动时,就会引起其余的羊不顾周围的情况,一拥而上。因此,羊群效应是指一个人理性行动后,其他人直接盲目跟从,造成非理性的羊群行为。Zookeeper中的羊群效应是指一个znode发生变化后,会触发大量本可以避免的watch通知,造成集群资源的浪费。Waitingevolution在获取不到锁的时候sleep一段时间如果线程获取锁失败,可以sleep一段时间再尝试获取锁。但这种方式效率极低。如果休眠时间短,会频繁轮询,浪费资源。如果sleep的时间长了,就会出现释放了锁但是还是没有获取到锁的尴尬情况。所以这里的优化点就是如何把主动轮询改成异步通知。当watch被锁定的节点的所有客户端都想获取锁时,只需要创建一个同名的节点即可。这些客户端在znode存在时在其上设置侦听器。当znode被删除时,所有等待锁的客户端都会收到通知,然后这些客户端会再次尝试获取锁。这里虽然使用了watch机制来进行异步通知,但是当client数量特别多的时候,会出现性能低下的情况。当一个znode被删除时,此时需要通知大量的客户端。在此期间,其他正常提交给zk的请求可能会被延迟或阻塞。这会产生羊群效应,其中某一点的更改(一个znode被删除)会产生全面影响(通知大量客户端)。所以这里的优化点是如何减少一个znode的monitor数量,最好的情况是只有一个。如果watch之前的orderednode先指定了一个basePath,那么想要获取锁的client会直接在该path下创建一个临时的orderednode。当创建的节点是最小节点时,说明已经获取到了锁。如果不是最小节点,则只设置监听到前一个节点,只监听前一个节点的删除行为。这样,当上一个节点被删除时,只会向下一个节点代表的客户端发送通知,而不会向所有客户端发送通知,从而避免了羊群效应。在避免羊群效应的同时,让当前锁成为公平锁。即按照加锁的先后顺序获取锁,避免线程过度饥饿。五、后记本文从源码的角度讲解了使用Curator获取分布式锁的过程,然后从等待锁的演化过程角度分析了Zookeeper在分布式锁场景下避免羊群效应的解决方案。这是Zookeeper系列的第二篇文章。其watch原理分析和zab协议的文章也在路上。