1。Zookeeper概述Zookeeper(以下简称ZK)是一种分布式、开源的分布式应用协调服务,通常运行在集群模式下,其协调能力可以理解为是基于观察者设计模式实现的;ZK服务会使用Znode来存储用户数据,将这些数据以树形目录的形式组织管理,支持用户以观察者的角色指定自己关注哪些节点/数据,当这些变化发生时,ZK会通知其观察者;为了满足本文的需要,突出了以下几个关键特性:数据组织:数据节点以树状目录(类似文件系统)的方式组织管理,每个节点的数据信息和节点信息都存储在一个节点。ZooKeeper的HierarchicalNamespace集群模式:通常,一个集群由3或5个基数实例组成。当一半以上的服务实例正常工作时,就可以对外提供服务了。它可以避免单点故障并尽可能高。每个服务实例都有一个数据备份,以实现数据的全局一致性。ZooKeeperService顺序更新:更新请求会被传递给leader,来自同一个client的更新会按照发送的顺序写入ZK。当处理写入请求创建一个Znode时,Znode名称会被分配一个全局唯一的增量编号,请求的顺序可以通过序列号推断出来,利用这个特性可以实现高级协调服务监听机制:为注册一个监听器一个节点,一旦节点发生变化(比如update或delete),listener会收到一个WatchEvent,可以感知到node\data的变化临时节点:session链接断开,临时节点没有了,并且您不能创建子节点(非常关键)。ZK的分布式锁就是基于以上特点实现的。简单来说:临时节点:用于支持异常情况下自动释放锁的能力序列节点:用于支持公平获取锁和排队的能力监控机制:用于支持抢锁能力集群模式:用于支持锁的高可用ofservices2.添加和解锁过程说明Createapermanentnodeasalocknode(/lock2)客户端试图在指定的锁名节点(/lock2)下加锁,创建临时顺序子节点获取锁node(/lock2)将获取到的子节点按照节点自增数从小到大排序得到的所有子节点,判断是否为第一个子节点,如果是则获取锁;如果不是,则监听比本节点小的节点的删除事件(这种只监听前一个节点的方法避免了惊群效应)如果申请锁被阻塞,申请锁的操作可以增加阻塞等待.如果监听事件生效(说明之前的节点被释放,可以尝试获取锁),则返回到第3步重新判断,直到获取到锁并解锁,删除第一个子节点并释放3.ZK分布式锁的能力可能是单个读者看不懂,这里引用上一篇文章《分布式锁上-初探》的一些内容,一个分布式锁应该具备以下功能特点:互斥:同时只有一个客户端可以holdthelock安全性:为避免死锁,如果一个client获取了锁并且处理时间超过了约定的最大时间,或者在锁持有期间发生故障,锁不能主动释放,持有的锁可以被其他机制正确释放,后面也可以锁定其他客户端,整个处理过程继续正常执行可用性:又称容错分布式锁需要具备高可用性,避免sing故障点。当提供锁的服务节点发生故障(宕机)时,不会影响服务运行。这里有两种模式:一种是分布式锁服务本身有集群模式,可以在出现故障时自动切换和恢复工作;另一种是客户端向多个独立的锁服务发起请求,当一个锁服务失效时,仍然可以从其他锁服务读取锁信息(Redlock)重入:对于同一个锁,加锁和解锁必须是同一个线程,即其他线程持有的锁无法释放高效灵活:加锁和解锁要快;支持阻塞和非阻塞;支持公平锁和非公平锁会话关闭后,锁会自动释放。实用性比较好。可重入线程可以是可重入的。解锁速度居中。阻塞和非阻塞都支持公平和不公平。只有公平锁是一种说法,性能不是很高,因为每次创建锁时都会创建锁。在释放锁的过程中,必须动态创建和销毁临时节点,才能实现锁功能。ZK中节点的创建和删除只能由Leader服务器来执行,然后Leader服务器还需要将数据同步到所有Follower机器上,如此频繁的网络通信中,性能的短板非常突出。在高性能、高并发场景下,不推荐使用ZooKeeper的分布式锁。由于ZooKeeper的高可用,在并发不是太高的场景下也推荐使用ZK分布式锁。4、InterProcessMutex使用示例Zookeeper客户端框架Curator提供的InterProcessMutex是分布式锁的一种实现。acquire方法是阻塞|非阻塞获取锁,release方法是释放锁。此外,它还提供了可撤销和可重入的功能。4.1接口介绍//获取互斥锁publicvoidacquire()throwsException;//获取给定时间内的互斥锁publicbooleanacquire(longtime,TimeUnitunit)throwsException;//释放锁处理publicvoidrelease()throwsException;//如果当前线程获取了互斥量,则返回truebooleanisAcquiredInThisProcess();4.2pom依赖org.apache.logging.log4jlog4j-core2.8.2org.apache.zookeeperzookeeper<版本>3.5.7org.apache.curator策展人框架4.3.0org.apache.curatorcurator-recipes4.3.0org.apache.curatorcurator-client4.3.04.3示例packagecom.atguigu.case3;导入org.apache.curator.framework.CuratorFramework;导入org.apache.curator.framework.CuratorFrameworkFactory;导入org.apache.curator.framework.recipes.locks.InterProcessMutex;导入org.apache.curator.retry.ExponentialBackoffRetry;公共类CuratorLockTest{publicstaticvoidmain(String[]args){//创建分布式锁1InterProcessMutexlock1=newInterProcessMutex(getCuratorFramework(),"/locks");//创建分布式锁2InterProcessMutexlock2=newInterProcessMutex(getCuratorFramework(),"/locks");newThread(newRunnable(){@Overridepublicvoidrun(){try{lock1.acquire();System.out.println("线路1获取到锁");lock1.获取();System.out.println("线程1再次获取锁");线程.sleep(5*1000);lock1.release();System.out.println("线程1释放锁");lock1.release();System.out.println("线程1再次释放锁");}catch(Exceptione){e.printStackTrace();}}})。开始();newThread(newRunnable(){@Overridepublicvoidrun(){try{lock2.acquire();System.out.println("Thread2acquiredthelock");lock2.acquire();System.out.println("线程2再次获取到锁");Thread.sleep(5*1000);lock2.release();System.out.println("线程2释放锁");lock2.release();System.out.println("线程2再次释放锁");}catch(Exceptione){e.printStackTrace();}}})。开始();}privatestaticCuratorFrameworkgetCuratorFramework(){ExponentialBackoffRetrypolicy=newExponentialBackoffRetry(3000,3);CuratorFrameworkclient=CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();//启动客户端client.start();System.out.println("zookeeper启动成功");回头客;}}5.DIY一个阉割版的分布式锁通过这个例子,对比第2节的内容,了解添加和解锁的过程,以及如何避免雷众效应包com.rock.case2;导入org.apache.zookeeper.*;导入org.apache.zookeeper.data.Stat;导入java.io.IOException;导入java.util.List;导入java.util.concurrent.CountDownLatch;/***zk分布式锁v1版本:*完成功能:*1.避免雷霆万钧效应*缺失功能:*1.超时控制*2.读写锁*3.重入控制*/publicclassDistributedLock{私有字符串连接字符串;私有intsessionTimeout;私有ZooKeeperzk;私有CountDownLatchconnectLatch=newCountDownLatch(1);私人CountDownLatchwaitLatch=newCountDownLatch(1);私有字符串等待路径;私有字符串当前节点;私有字符串LOCK_ROOT_PATH;privatestaticStringNODE_PREFIX="w";publicDistributedLock(StringconnectString,intsessionTimeout,StringlockName){//TODO:数据验证this.connectString=connectString;this.sessionTimeout=sessionTimeout;this.LOCK_ROOT_PATH=锁名;}publicvoidinit()抛出IOException,KeeperException,InterruptedException{//建联zk=newZooKeeper(connectString,sessionTimeout,watchedEvent->{//连接到zk后释放connectLatchif(watchedEvent.getState()==Watcher.Event.KeeperState.SyncConnected){connectLatch.countDown();}});connectLatch.await();//等待zk正常连接//判断锁名节点是否存在Statstat=zk.exists(LOCK_ROOT_PATH,false);if(stat==null){//创建锁名节点}catch(KeeperExceptione){//忽略并发创建冲突if(!e.code().name().equals("NODEEXISTS")){throwe;}}}}/***新增功能:*1.超时设置*2.读写区分*3.重入控制*/publicvoidzklock()throwsKeeperException,InterruptedException{if(!tryLock()){等待锁();zklock();}}/****/privatevoidwaitLock()throwsKeeperException,InterruptedException{try{zk.getData(waitPath,newWatcher(){@Overridepublicvoidprocess(WatchedEventwatchedEvent){//如果(watchedEvent.getType()==Watcher.Event.EventType.NodeDeleted&&watchedEvent.getPath().equals(waitPath)){waitLatch.countDown();}}},newStat());//等待监听waitLatch.await();}赶上(KeeperException.NoNodeExceptione){//如果等待节点已经被清除,不等待,尝试抢锁return;}}privatebooleantryLock()throwsKeeperException,InterruptedException{currentNode=zk.create(LOCK_ROOT_PATH+"/"+NODE_PREFIX,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是否是序号最小的节点,如果获取到锁;如果不是,则监听上一个节点Listchildren=zk.getChildren(LOCK_ROOT_PATH,false);//如果children只有一个值,则直接获取锁;如果有多个节点,需要判断谁最小if(children.size()==1){returntrue;}else{StringthisNode=currentNode.substring(LOCK_ROOT_PATH.length()+1);//通过w00000000获取节点在children集合中的位置intindex=children.indexOf(thisNode);if(index==0){//我是第一个节点returntrue;}//需要监听他前一个节点的变化waitPath=LOCK_ROOT_PATH+"/"+children.get(index-1);}回复是假的;}//解锁publicvoidunZkLock(){//删除节点try{zk.delete(this.currentNode,-1);}catch(InterruptedExceptione){e.printStackTrace();}赶上(KeeperExceptione){e。打印堆栈跟踪();}}}