准备本文将使用三台独立的服务器,可以自行提前搭建。如果不知道如何搭建,可以看我之前的ZooKeeper集群搭建:Zookeeper集群部署那些事。ZooKeeper的一些基本命令可以看这篇文章:Zookeeper入门。对于单个服务,我们可以使用Java自带的一些锁来实现对资源的顺序访问。但是,随着业务的发展,现在基本上公司都有多种服务。简单的Lock或者Synchronize只能解决单个JVM线程的问题。问题,那么单个服务的Java锁是不能满足我们业务需求的。为了解决多个服务跨服务访问共享资源的问题,出现了分布式锁。分布式锁的原因是集群。文本中分布式锁的实现方式有哪些?分布式锁的实现方式主要有3种(ZooKeeper、Reids、Mysql)。今天我们主要讲解使用ZooKeeper实现分布式锁。ZooKeeper的应用场景主要包括这几个方面:服务注册与订阅(共享节点)分布式通知(监控ZNode)服务命令(ZNode特性)数据订阅与发布(Watcher)分布式锁(临时节点)ZooKeeper实现分布式锁,主要获取由于ZooKeeper保证了数据的强一致性,所以锁服务可以分为两类:保持所有试图获取当前锁的客户端独占,最后拥有唯一一个能成功获取当前锁的key。通常我们会把ZooKeeper上的节点(ZNode)看成是一把锁,通过创建一个临时节点来实现。当多个客户端创建锁时,只有创建成功的客户端才能拥有锁的控制权时序所有尝试获取锁的客户端都是顺序执行的,但是会有一个序号(zxid),我们会有一个节点,比如:/testLock,所有的临时节点都创建在这个下面,ZK的父节点(/testLock)维护一个序列号,这是ZK的一个属性。保证子节点创建的时序性,从而为每个客户端形成一个全局时序的ZK锁机制。在实现ZooKeeper分布式锁之前,我们必须了解一下ZooKeeper的分布式锁机制的实现过程和原理。不然各位小伙伴,出去面试怎么跟面试官说话呢~临时时序节点是基于ZooKeeper的临时时序节点。ZooKeeper更适合实现分布式锁:序号生成器:ZooKeeper的每个节点都有自己的序号生成器:在每个节点下创建一个临时节点,新的子节点后会加上一个序号。这个生成的数字会在最后一个数字+1,操作顺序递增:ZooKeeper节点递增,以保证锁的公平性。我们只需要在持久化父节点下创建对应的临时时序节点即可。每个线程在抢占锁之前,都会调用watch,判断你当前序号是否是当前父节点中最小的,如果是,则获取锁Znode监控:每个线程抢占之前,都会创建一个ZNode节点属于当前线程,当释放锁时,会删除创建的ZNode,当我们创建的序号不是最小的时候,会等待watch通知,即上一个ZNode的状态通知。当上一个ZNode被删除时,会触发回调机制,告诉下一个ZNode,你可以获取到锁,可以开始工作了。临时节点自动删除:ZooKeeper还有一个好处,当我们客户端断开连接后,我们创建的临时节点会自动删除,所以我们在使用分布式锁的时候,一般都会创建临时节点,这样可以避免因网络而死异常等原因。Lockherdeffect:ZooKeeper节点的顺序访问,监控前面在后面的方法,可以有效避免羊群效应,什么是羊群效应:当一个节点挂了,所有节点都要监控,然后做If有一个临时顺序节点,当一个节点挂掉时,只有它后面的节点才会响应。我们来看下图:上图中,ZooKeeper中有一个锁节点testLock。这把锁是ZooKeeper的一个节点。当两个客户端获得这个锁时,他们将锁定ZooKeeper。Request,也就是我们所说的临时时序节点。当我们在/testLock目录下创建一个顺序临时节点时,ZK会自动为这个临时节点维护一个节点序号,这个节点是增量的。比如clientA创建一个临时序号,ZK内部会生成一个序号:/lock0000000001,然后clientB也生成一个临时序号,ZK会生成/lock0000000002序号,序号依次递增,开始从1开始,ZK内部会维护这个顺序。如下图所示:此时ClientA会监控判断我是不是父节点下的最小的。如果是,那我就可以锁了,因为我最小,别人都比我大。我可以锁住自己,你已经是成熟的临时节点了,你要学会锁住自己。咳咳,那么ZK是怎么判断的呢?宝贝你往下看:这是cleintA已经加锁了,clientB这个时候也会来加锁,所以他也需要在/testLock节点创建一个属于自己的临时锁,那么它的序号会变成/lock0000000002这个时候,如下图所示:这时候就会出现我们前面说的,clientB在加锁的时候会判断是否是最小的。父节点不是最小的啊~我挺大的,还有更小的!!!锁失败了,咳咳,这时候clientB会去窥探clientA,气氛渐渐的暧昧起来,啊不,是为了查看之前的节点(clientA)是否完成工作。如果完成,clientB就可以进行加锁工作了。宝贝,你可以往下看图:clientA锁定成功后,会进行自己的业务处理,clientA做完了就说我做完了,下一个,那么clientA是怎么做完的,它花了多少时间?不是,具体流程是什么?你不对,小农,你在说什么!!!好尴尬我们不是说了,当clientB加锁失败时,会在之前的节点(clientA)上加一个monitor。当clientA被删除时,说明有人释放了锁,会通知clientB重新获取锁。这时clientB重新获取锁时,发现是当前父节点下最小的,于是clientB开始加锁,开始工作,一系列操作。当clientB完成后,释放锁,然后说,下一步。如下图所示:当然,除了clientA、clientB和C\D\E等,这个字母看着陌生又熟悉,原理是一样的,都是要解锁的最小节点。如果没有,监控上一个节点是否释放,如果释放,则重新尝试加锁。如果释放上一节中的节点,则它是最小的,它会走到最前面,有点类似于“取银行号”的操作。代码实现使用ZooKeeper创建临时顺序节点实现分布式锁。大体流程是先创建持久化父节点,在当前节点下创建临时顺序节点,找到最小序号,获取分布式锁,程序业务完成,然后释放锁,通知下一个节点操作,使用watch监听节点的变化,然后依次对下一个最小的序列节点进行操作。首先我们需要创建一个持久化父节点:我在这里/mxnWatchCallBackimportorg.apache.zookeeper.*;importorg.apache.zookeeper.data.Stat;importjava.util.Collections;importjava.util.List;importjava.util.concurrent。CountDownLatch;/***@program:mxnzookeeper*@ClassNameWatchCallBack*@description:*@author:微信搜索:木晓农*@create:2021-10-2310:48*@Version1.0**/publicclassWatchCallBackimplementsWatcher,AsyncCallback.StringCallback,AsyncCallback.Children2Callback,AsyncCallback.StatCallback{ZooKeeperzk;StringthreadName;CountDownLatchcc=newCountDownLatch(1);StringpathName;publicStringgetPathName(){returnpathName;}publicvoidsetPathName(StringpathName){this.pathName=pathName;}publicStringgetThreadThreadNamen()读(NameurpublicThreadThreadNamen()读(NameurpublicThreadThreadNamen();retStringthreadName){this.threadName=threadName;}publicZooKeepergetZk(){returnzk;}publicvoidsetZk(ZooKeeperzk){this.zk=zk;}/**@Author猫小农*@Description//TODO尝试锁定方法*@Date16:142021/10/24*@Param*@return**/publicvoidtryLock(){try{System.out.println(threadName+"start创造。..");//创建顺序临时节点zk.create("/lock",threadName.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");//阻塞当前一个,监听对上一个节点是否释放锁?param*@return**/publicvoidunLock(){try{//释放锁,删除临时节点zk.delete(pathName,-1);//结束工作System.out.println(threadName+"结束工作....");}catch(InterruptedExceptione){e.printStackTrace();}catch(KeeperExceptione){e.printStackTrace();}}@Overridepublicvoidprocess(WatchedEventevent){//如果第一个节点释放锁,然后第二个你会收到一个回调//告诉它前一个节点已经释放了,你可以开始尝试获取锁switch(event.getType()){caseNone:break;caseNodeCreated:break;caseNodeDeleted://当前节点重新获取锁zk.getChildren("/",false,this,"sdf");break;caseNodeDataChanged:break;caseNodeChildrenChanged:break;}}@OverridepublicvoidprocessResult(intrc,Stringpath,对象ctx,Stringname){if(name!=null){System.out.println(threadName+"线程创建一个节点为:"+name);pathName=name;//监听上一个节点zk.getChildren("/",false,this,"sdf");}}//getChildrencallback@OverridepublicvoidprocessResult(intrc,Stringpath,Objectctx,List
