当前位置: 首页 > 科技观察

基于Zookeeper的分布式锁

时间:2023-03-16 19:10:46 科技观察

这篇文章只需要你10分钟。目前流行的分布式锁的实现方案有3种,分别是基于数据库的方案、基于Redis的方案和基于Zookeeper的方案。其中,前两种方案网上资料较多,本文不再展开。下面我们就来看看如何使用Zookeeper实现分布式锁。什么是动物园管理员?Zookeeper(业界简称为zk)是一种提供配置管理、分布式协作和命名的集中式服务。这些功能是分布式系统中非常底层和必不可少的基础功能,但是如果你要实现这些功能并在保持一致性和可用性的同时实现高吞吐量和低延迟实际上是非常困难的。所以zookeeper提供了这些功能,开发者在zookeeper之上构建自己的各种分布式系统。虽然zookeeper的实现比较复杂,但是它提供的模型抽象非常简单。Zookeeper提供了一个多级的节点命名空间(节点称为znodes),每个节点用斜线(/)分隔的路径表示,每个节点都有一个父节点(根节点除外),这与File很相似系统。例如/foo/doo代表一个znode,它的父节点是/foo,它的父节点是/,/是没有父节点的根节点。与文件系统不同的是,这些节点可以设置关联数据,而在文件系统中,只有文件节点可以存储数据,目录节点不能。为了保证高吞吐量和低延迟,Zookeeper在内存中维护着这种树状目录结构。此功能可防止使用Zookeeper来存储大量数据。每个节点的数据存储上限为1M。为了保证高可用,zookeeper需要以集群的形式部署,所以只要集群中的大部分机器可用(可以容忍一定的机器故障),那么zookeeper本身还是可用的。客户端在使用zookeeper时,需要知道集群机器列表,通过与集群中的某台机器建立TCP连接来使用服务。客户端使用这个TCP连接发送请求,获取结果,获取监控事件,发送心跳包。如果连接异常断开,客户端可以连接到另一台机器。架构图如下:客户端的读请求可以被集群中的任意一台机器处理。如果读请求在节点上注册了监听器,监听器也会被连接的zookeeper机器处理。对于写请求,这些请求会同时发送给其他zookeeper机器,达成共识后,请求成功返回。因此,随着zookeeper集群机器数量的增加,读请求的吞吐量会增加,而写请求的吞吐量会下降。排序是zookeeper中一个非常重要的特性。所有更新都是全局排序的,每个更新都有一个唯一的时间戳。这个时间戳称为zxid(ZookeeperTransactionId)。读请求只会相对更新进行排序,即读请求的返回结果会有zookeeper***的zxid。如何使用zookeeper实现分布式锁?在描述算法流程之前,我们先看看zookeeper中节点的几个有趣的属性:有序节点:如果当前有一个名为/lock的父节点,我们可以在这个父节点下创建子节点;zookeeper提供了一个可选的有序特性,比如我们可以创建一个子节点“/lock/node-”并指定顺序,那么zookeeper在生成子节点的时候会自动根据当前子节点的个数加上一个整数,即也就是说,如果是第一个创建的子节点,那么生成的子节点就是/lock/node-0000000000,下一个节点就是/lock/node-0000000001,以此类推。临时节点:客户端可以创建一个临时节点,zookeeper会在会话结束或会话超时后自动删除该节点。事件监听:在读取数据的时候,我们可以同时在节点上设置事件监听。当节点数据或结构发生变化时,zookeeper会通知客户端。目前zookeeper有以下四个事件:1)节点创建;2)节点删除;3)节点数据修改;4)子节点变化。下面介绍使用zookeeper实现分布式锁的算法流程,假设锁空间的根节点为/lock:客户端连接zookeeper,并在/lock下创建临时有序的子节点,对应第一个客户端的子节点是/lock/lock-0000000000,第二个是/lock/lock-0000000001,依此类推。客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点。如果是,则认为已经获得锁,否则,监听/lock的子节点变化消息,获得子节点变化通知后重复此步骤,直到获得锁;执行业务代码;业务流程完成后,删除对应的子节点,释放锁。第1步创建的临时节点可以保证即使发生故障也能释放锁。考虑这样的场景:如果客户端a当前创建的子节点是序号最小的节点,客户端所在机器在获取锁后崩溃,客户端没有主动删除子节点;如果创建了最新的节点,则锁永远不会被释放,从而导致死锁;由于临时节点的创建,客户端宕机后,zookeeper不会在收到客户端的心跳包后,判断会话无效,删除临时节点释放锁。另外,细心的朋友可能会想到第2步获取子节点列表和设置monitor两步操作的原子性,考虑这样的场景:客户端a对应的子节点是/lock/lock-0000000000,而clientb对应的子节点为/lock/lock-0000000001。当客户端b获取到子节点列表时,发现自己并不是序列号最小的那个,但是在设置监听器之前,客户端a完成了业务流程,删除了子节点/lock/lock-0000000000。终端b设置的监听器不会丢失这个事件导致它永远等待吗?这个问题不存在。因为zookeeper提供的API中设置监听器的操作和读取操作是原子执行的,也就是说在读取子节点列表的时候同时设置监听器,保证不会丢失任何事件。***,这个算法有一个很大的优化点:假设当前有1000个节点在等待锁,如果获取锁的客户端释放锁,这1000个客户端都会被唤醒。羊群效应”;在这种羊群效应下,zookeeper需要通知1000个client,这会阻塞其他操作。最坏的情况下,应该只唤醒新的最小节点对应的client。怎么办?设置事件监听时,每个client都要为它之前的子节点设置事件监听,比如子节点列表为/lock/lock-0000000000,/lock/lock-0000000001,/lock/lock-0000000002,串口的client编号1监听序号0的子节点删除消息,序号2的客户端监听序号1的子节点删除消息。因此调整后的分布式锁算法流程如下:客户端连接zookeeper,在/lock下创建临时有序的子节点,第一个client对应的子节点是/lock/lock-0000000000,第二个是/lock/lock-0000000001,以此类推。客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点。如果是,则认为已经获得了锁,否则,则监听自己之前的子节点的delete消息,重复此步骤,直到获得子节点变化通知后获得锁;执行业务代码;业务流程完成后,删除对应的子节点,释放锁。Curator的源码分析虽然zookeeper原生客户端暴露的API很简单,但是实现分布式锁还是比较麻烦的……我们可以直接使用开源项目curator提供的zookeeper分布式锁实现。我们只需要引入如下包(基于maven):org.apache.curatorcurator-recipes4.0.0然后你就可以使用它了!代码如下:publicstaticvoidmain(String[]args)throwsException{//创建zookeeper客户端RetryPolicyretryPolicy=newExponentialBackoffRetry(1000,3);CuratorFrameworkclient=CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181",retryPolicy);client.start();//创建分布式锁,锁空间的根节点路径为/curator/lockInterProcessMutexmutex=newInterProcessMutex(client,"/curator/lock");mutex。acquire();//获取锁,继续业务流程System.out.println("Entermutex");//完成业务流程,释放锁mutex.release();//关闭客户端client。close();}可见关键的核心操作只有mutex.acquire()和mutex.release(),简直太方便了!下面分析一下获取锁的源码实现。acquire的方法如下:/**获取锁。当锁被占用时,会阻塞等待。该操作支持同一个线程的重入(即重复获取锁)。获取的数量需要与发布的数量相同。*@throwsExceptionZKerrors,connectioninterruptions*/@Overridepublicvoidacquire()throwsException{if(!internalLock(-1,null)){thrownewIOException("Lostconnectionwhiletryingtoacquirelock:"+basePath);}}和zookeeper通信时有一个需要注意的地方,acquire会直接抛出异常,要求用户执行重试策略。代码中调用了internalLock(-1,null),参数表示当锁被占用时,永远不会阻塞等待。internalLock的代码如下:privatebooleaninternalLock(longtime,TimeUnitunit)throwsException{//这里处理同线程的重入。如果已经获取到锁,那么只需要在相应的数据结构中添加获取次数的统计,直接返回成功即可。ThreadcurrentThread=Thread.currentThread();LockDatalockData=threadData.get(currentThread);if(lockData!=null){//重新进入lockData.lockCount.incrementAndGet();returntrue;}//这个才是真正的获取方式lockinzookeeperStringlockPath=internals.attemptLock(time,unit,getLockNodeBytes());if(lockPath!=null){//获取锁后,记录当前线程获取锁的信息,只需要增加重入时在LockData中的次数。LockDatanewLockData=newLockData(currentThread,lockPath);threadData.put(currentThread,newLockData);returntrue;}//返回阻塞时获取不到锁。这里的context处理隐含的意思是zookeeper通信异常returnfalse;}在代码中添加具体注释,不做扩展。看zookeeper获取锁的具体实现:StringattemptLock(longtime,TimeUnitunit,byte[]lockNodeBytes)throwsException{//参数初始化,此处省略//...//自旋获取锁while(!isDone){isDone=true;try{//在锁空间下创建临时有序的子节点ourPath=driver.createsTheLock(client,path,localLockNodeBytes);//判断是否获得锁(子节点序列号最小),如果直接返回获取到锁,否则阻塞等待之前的子节点删除通知hasTheLock=internalLockLoop(startMillis,millisToWait,ourPath);}catch(KeeperException.NoNodeExceptione){//对于NoNodeException,代码保证这里只会抛出NoNodeException当session过期时,所以这里根据重试策略Retry;}else{throwe;}}}//如果获取到锁,返回子节点的路径if(hasTheLock){returnourPath;}returnnull;}上面代码主要有两个步骤:一个临时有序的子节点,实现起来比较简单不展开,主要关注几个节点的模式:1)PERSISTENT(***);2)PERSISTENT_SEQUENTIAL(***并有序);3)EPHEMERAL(临时);4)EPHEMERAL_SEQUENTIAL(临时的和有序的)。internalLockLoop:阻塞并等待直到获得锁。看看internalLockLoop是如何判断锁和阻塞等待的。这里删除一些不相关的代码,只保留主流程://自旋直到获得锁while((client.getState()==CuratorFrameworkState.STARTED)&&!haveTheLock){//获取所有child的列表节点,并按照序号从小到大排序Listchildren=getSortedChildren();//根据序号判断当前子节点是否为最小子节点StringsequenceNodeName=ourPath.substring(basePath.length()+1);//+1toincludetheslashPredicateResultspredicateResults=driver.getsTheLock(client,children,sequenceNodeName,maxLeases);if(predicateResults.getsTheLock()){//如果是最小的子节点,则认为有锁haveTheLock=true;}else{//否则获取上一个子节点StringpreviousSequencePath=basePath+"/"+predicateResults.getPathToWatch();//这里使用对象监听器进行线程同步,当没有获取到锁时,监听上一个子节点nodedeletemessageandwait(),当前面的child节点被删除(即释放锁),回调会通过notifyAll唤醒线程,线程会继续自旋判断是否获得锁synchronized(this){try{//getData()这里使用接口而不是checkExists()是因为,如果之前的子节点已经被删除,会抛出异常,不会设置事件监听器。checkExists虽然也可以获取节点存在的信息,但是同时设置了一个监听器,这个监听器永远不会触发,这对zookeeperclient.getData().usingWatcher(watcher).forPath(previousSequencePath);//如果设置了阻塞等待时间if(millisToWait!=null){millisToWait-=(System.currentTimeMillis()-startMillis);startMillis=System.currentTimeMillis();if(millisToWait<=0){doDelete=true;//等待时间到达,删除对应的子节点break;}//等待对应时间wait(millisToWait);}else{//永远等待wait();}}catch(KeeperException.NoNodeExceptione){//上面使用getData设置监听时,如果之前的子节点已经被删除,会抛出NoNodeException时,只需要自旋一次,不需要额外处理}}}}具体逻辑见注释,不再重复代码中设置的事件监听。当事件回调发生时,只是notifyAll唤醒当前线程重新判断,比较简单,不再展开。多于。