当前位置: 首页 > 科技观察

面试官问怎么用Zookeeper实现分布式锁,你知道吗???

时间:2023-03-19 13:19:03 科技观察

概述说到锁,大家可能首先想到的就是JavaJUC中的synchronized关键字或者说可重入锁ReentrantLock。它可以保证只有一个线程同时执行我们的代码,保证数据的一致性和完整性。但它仅限于单体项目,这意味着它们只能保证单个JVM应用程序中线程的顺序执行。如果部署多个节点,即在分布式场景下,如何保证同一时刻只有一个线程在不同节点上执行?秒杀、抢券等场景的业务场景,介绍一下我们的分布式锁。在这篇文章中,我们主要讲解如何利用Zookeeper的特性来实现我们的分布式锁。Zookeeper分布式锁实现原理利用Zookeeper的临时时序节点和监控机制两大特点,可以帮助我们实现分布式锁。首先要有一个持久化的节点/锁,路径服务于一定的使用场景。如果有多个使用场景,建议路径不同。当有请求进来的时候,先在/locks中创建一个临时有序节点,在/locks下会看到seq-000000000、seq-00000001等节点。然后判断当前创建的节点是否是/locks路径下的最小节点,如果是则获取锁,如果不是则阻塞线程,并设置一个监听器监听上一个节点。获取到锁后,开始处理业务逻辑,最后删除当前节点,也就是释放锁。后面的节点会收到通知,唤醒线程,重复上面的判断。你有没有想过为什么要设置对前一个节点的监控?主要是为了避免羊群效应。所谓羊群效应就是当一个节点挂掉后,所有节点都会监听并响应,这会给服务器带来巨大的压力,所以会出现临时顺序节点。当一个节点挂掉时,只有它后面的节点才反应过来。原生的Zookeeper客户端通过原生的zookeeperAPI实现分布式锁,可以加强我们对ZK实现分布式锁原理的理解。公共类DistributedLock{privateStringconnectString="10.100.1.176:2281";私人intsessionTimeout=2000;私有ZooKeeperzk;privateStringrootNode="lock";privateStringsubNode="seq-";客户端创建的子节点privateStringcurrentNode;私人CountDownLatchcountDownLatch=newCountDownLatch(1);私人CountDownLatchwaitDownLatch=newCountDownLatch(1);publicDistributedLock()throwsIOException,InterruptedException,KeeperException{Timezk=newZooKeeper,Watcher(){@Overridepublicvoidprocess(WatchedEventevent){//如果连接建立,唤醒等待闩锁的线程if(event.getState()==Event.KeeperState.SyncConnected){countDownLatch.countDown();}//waitPath的删除事件发生了if(event.getType()==Event.EventType.NodeDeleted&&event.getPath().equals(waitPath)){waitDownLatch.countDown();}}});//等待连接建立,因为连接建立时异步处理countDownLatch.await();//获取根节点Statstat=zk.exists("/"+rootNode,false);//如果根节点不存在,则创建根节点if(stat==null){System.out.println("创建根节点");zk.create("/"+rootNode,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}publicvoidzkLock(){try{//在根节点创建临时顺序节点currentNode=zk.create("/"+rootNode+"/"+subNode,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//获取子节点ListchildrenNodes=zk.getChildren("/"+rootNode,false);//如果只有一个子节点,则表示是当前节点,直接获取锁if(childrenNodes.size()==1){return;}else{//将根节点下的所有临时顺序节点从小到大排序Collec动作排序(子节点);//当前节点名StringthisNode=currentNode.substring(("/"+rootNode+"/").length());//获取当前节点的位置intindex=childrenNodes.indexOf(thisNode);if(index==-1){System.out.println("数据异常");}elseif(index==0){//index==0,说明thisNode是链表中最小的,当前client获取锁返回;}else{//获取比currentNode排名高1的节点this.waitPath="/"+rootNode+"/"+childrenNodes.get(index-1);//在waitPath节点上注册一个监听器,当waitPath被删除时,zookeeper会回调监听器的处理方法zk.getData(waitPath,true,newStat());//进入等待锁状态waitDownLatch.await();}}}catch(KeeperExceptione){e.printStackTrace();}赶上(InterruptedExceptione){e.printStackTrace();}}publicvoidzkUnlock(){try{zk.delete(this.currentNode,-1);}catch(InterruptedExceptione){e.printStackTrace();}赶上(KeeperExceptione){e。打印堆栈跟踪();}}}测试代码如下:DistributedLocklock2=newDistributed;newLock(read)(()->{//获取锁对象try{lock1.zkLock();System.out.println("线程1获取锁");Thread.sleep(5*1000);System.out.println("Thread1releaselock");}catch(Exceptione){e.printStackTrace();}finally{lock1.zkUnlock();}}).start();新线程(()->{//获取锁对象try{lock2.zkLock();System.out.println("线程2获得锁");线程.sleep(5*1000);System.out.println("线程2释放锁");}catch(Exceptione){e.printStackTrace();}最后{lock2.zkUnlock();}})。开始();}}测试结果:线程2获取锁线程2释放锁线程1获取锁线程1释放锁获取锁和释放锁成对出现,说明分布式锁生效了。Curator框架实现了分布式锁。在实际开发中,我们会直接使用成熟的框架Curator客户端,它封装了分布式锁的实现,避免我们重新发明轮子。pom.xml添加如下依赖org.apache.curatorcurator-recipes5.2.1通过InterProcessLock实现分布式锁定公共类CuratorLockTest{privateStringconnectString="10.100.1.14:2181";privateStringrootNode="/locks";publicstaticvoidmain(String[]args){newCuratorLockTest().testLock();}publicvoidtestLock(){//分布式锁1InterProcessLocklock1=newInterProcessMutex(getCuratorFramework(),rootNode);//分布式锁2InterProcessLocklock2=newInterProcessMutex(getCuratorFramework(),rootNode);//第一个线程newThread(()->{//获取锁对象try{lock1.acquire();System.out.println("Thread1acquireslock");//测试锁重入lock1.acquire();System.out.println("线程1再次获取锁");Thread.sleep(5*1000);lock1.release();System.out.println("线程1释放锁");lock1.release();System.out.println("Thread1Releasethelockagain");}catch(Exceptione){e.printStackTrace();}}).start();//第二个线程newThread(()->{//获取锁对象try{lock2.acquire();System.out.println("线程2获取锁");//测试锁重入lock2.acquire();System.out.println("线程2再次获取锁");Thread.sleep(5*1000);lock2.release();System.out.println("线程2释放锁");lock2.release();System.out.println("线程2再次释放锁");}catch(Exceptione){e.printStackTrace();}}).start();}publicCuratorFrameworkgetCuratorFramework(){CuratorFrameworkclient=CuratorFrameworkFactory.builder().connectString(connectString).connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(newExponentialBackoffRetry(3000,3)).build();//连接client.start();系统。out.println("zookeeper初始化完成...");回头客;}}结果显示线程1释放锁线程1再次释放锁线程2获取锁线程2再次获取锁线程2释放锁线程2再次释放锁有兴趣看源码,它实现了阻塞代码通过等待和通知:https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo/zookeeper-lock总结ZooKeeper分布式锁(如InterProcessMutex)可以有效解决问题分布式锁,但是性能不高。因为每次在创建和释放锁的过程中,都必须动态创建和销毁瞬时节点,才能实现锁功能。众所周知,ZK中节点的创建和删除只能由Leader服务器来完成,然后Leader服务器需要将数据共享给所有的Follower机器。这种频繁的网络通信有一个非常突出的性能缺点。在高性能高并发场景下,不推荐使用ZooKeeper分布式锁,可以使用Redis分布式锁。由于ZooKeeper的高可用,建议在并发量不是太高的场景下使用ZooKeeper的分布式锁。