1.Zookeeper概述Zookeeper(以下简称ZK)是一种分布式、开源的分布式应用协调服务,通常以集群模式运行,其协调能力可以理解为是基于观察者设计模式实现的;ZK服务会使用Znode来存储用户数据,将这些数据以树形目录的形式组织管理,支持用户以观察者的角色指定自己关注哪些节点/数据,当这些变化发生时,ZK会通知其观察者;为了满足本文的需要,突出了以下几个关键特性:数据组织:数据节点以树状目录(类似文件系统)的方式组织管理,每个节点的数据信息和节点信息都存储在一个节点。集群模式:通常一个集群由3或5个cardinality实例组成。当一半以上的服务实例正常工作时,才可以对外提供服务。这样既可以避免单点故障,也可以尽可能的高。每个服务实例都有一个数据备份。更新以实现数据的全局一致性:更新请求将由领导者执行,来自同一客户端的更新将按照发送的顺序写入ZK。当处理写请求创建一个Znode时,Znode名称会被分配一个全局唯一的incrementNumber,请求的顺序可以通过sequencenumber推断出来,利用这个特性可以实现高级协调服务监听机制:注册一个listenerfor一个节点,一旦节点发生变化(如更新或删除),监听器会收到一个WatchEvent,可以感知到node\data的变化临时节点:session链接断开,临时节点消失,子节点无法创建(非常关键)。ZK的分布式锁就是基于以上特点实现的。简单的说:临时节点:用于支持异常情况下自动释放锁顺序节点:用于支持公平获取锁和队列等待能力监控机制:用于支持抢锁能力集群模式:用于支持锁服务的高可用2.添加和解锁的过程说明Createapermanentnodeasalocknode(/lock2)客户端试图在指定的锁名节点(/lock2)下加锁,创建一个临时的顺序子节点获取该锁节点下的所有子节点(/lock2)将获取到的子节点按照节点自增数从小到大排序,判断是否为第一个子节点,如果是则获取锁;如果不是,则监听比该节点小的节点的删除事件(前一个节点的方法避免了震群效应)如果申请锁被阻塞,申请锁的操作会增加阻塞等待。如果监听事件生效(说明之前的节点被释放,可以尝试获取锁),回到第3步重新开始判断,直到锁解锁,删除第一个子节点,释放3。ZK分布式锁的能力可以一篇文章看完。这里引用上篇文章《分布式锁上-初探》的一些内容,一个分布式锁应该具备以下功能特性:互斥:同一时间,只有一个客户端可以持有锁安全:避免死锁,如果一个客户端获取了锁且处理时间超过最大约定时间,或者在加锁过程中出现故障,无法主动释放锁,其持有的锁也可以通过其他机制正确释放,并保证其他客户端也可以稍后加锁,整个处理流程继续正常执行可用性:又称容错,分布式锁需要高可用性,避免单点故障。当提供锁的服务节点发生故障(宕机)时,不会影响服务运行。这里有两种模式:一种是分布式锁服务本身有集群模式,可以自动切换和恢复工作;另一种是客户端向多个独立的锁服务发起请求,当一个锁当服务失败时,仍然可以从其他锁服务中读取锁信息(Redlock)。可重入性:对于同一个锁,加锁和解锁必须是同一个线程,即其他线程持有的锁不能被释放。高效灵活:锁定和解锁要快速;支持阻塞和非阻塞;supportfairlocksandunfairlylocksAddedin):性能低下的说法是因为每次在创建和释放锁的过程中,都必须动态创建和销毁临时节点来实现锁功能。ZK中节点的创建和删除只能通过Leader服务器来完成。执行,然后Leader服务器还需要同步数据到所有Follower机器,如此频繁的网络通信,性能的短板非常突出。在高性能、高并发场景下,不推荐使用ZooKeeper的分布式锁。由于ZooKeeper的高可用,在并发不是太高的场景下也推荐使用ZK分布式锁。4、InterProcessMutex使用示例Zookeeper客户端框架Curator提供的InterProcessMutex是分布式锁的一种实现。acquire方法是阻塞|非阻塞获取锁,release方法是释放锁。此外,它还提供了可撤销和可重入的功能。4.1接口介绍//获取互斥锁publicvoidacquire()throwsException;//获取给定时间内的互斥锁publicbooleanacquire(longtime,TimeUnitunit)throwsException;//释放锁处理publicvoidrelease()throwsException;//如果当前线程获取了互斥量,则返回truebooleanisAcquiredInThisProcess();4.2pom依赖org.apache.logging.log4jlog4j-core2.8.2org.apache.zookeeperzookeeper<版本>3.5.7org.apache.curator策展人框架4.3.0org.apache.curatorcurator-recipes4.3.0org.apache.curatorcurator-client4.3.04.3示例packagecom.atguigu.case3;导入org.apache.curator.framework.CuratorFramework;导入org.apache.curator.framework.CuratorFrameworkFactory;导入org.apache.curator.framework.recipes.locks.InterProcessMutex;导入org.apache.curator.retry.ExponentialBackoffRetry;公共类CuratorLockTest{publicstaticvoidmain(String[]args){//创建分布式锁1InterProcessMutexlock1=newInterProcessMutex(getCuratorFramework(),"/locks");//创建分布式锁2InterProcessMutexlock2=newInterProcessMutex(getCuratorFramework(),"/locks");newThread(newRunnable(){@Overridepublicvoidrun(){try{lock1.acquire();System.out.println("线程1获取到lock");lock1.acquire();System.out.println("线程1再次获得锁");Thread.sleep(5*1000);lock1.release();System.out.println("线程1释放锁");lock1.release();System.out.println("线程1再次释放锁");}catch(Exceptione){e.printStackTrace();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{lock2.acquire();System.out.println("Thread2acquiredthelock");lock2.acquire();System.out.println("线程2再次获取锁");Thread.sleep(5*1000);lock2.release();System.out.println("线程2释放锁");lock2.release();System.out.println("线程2再次释放锁");}catch(Exceptione){e.printStackTrace();}}})。开始();}privatestaticCuratorFrameworkgetCuratorFramework(){ExponentialBackoffRetrypolicy=newExponentialBackoffRetry(3000,3);CuratorFrameworkclient=CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();//启动客户端client.start();System.out.println("zookeeper启动成功");回头客;}}5。DIY一个阉割版的分布式锁加解锁的过程,以及如何避免惊群效应包com.rock.case2;导入org.apache.zookeeper.*;导入org.apache.zookeeper.data.Stat;导入java.io.IOException;导入java.util.List;导入java.util.concurrent.CountDownLatch;/***zk分布式锁v1版本:*完成功能:*1.避免雷霆万钧效应*缺失功能:*1.超时控制*2.读写锁*3.重入控制*/publicclassDistributedLock{私有字符串连接字符串;私有intsessionTimeout;私有ZooKeeperzk;私有CountDownLatchconnectLatch=newCountDownLatch(1);私人CountDownLatchwaitLatch=newCountDownLatch(1);私有字符串等待路径;私有字符串当前节点;私有字符串LOCK_ROOT_PATH;privatestaticStringNODE_PREFIX="w";publicDistributedLock(StringconnectString,intsessionTimeout,StringlockName){//TODO:数据验证this.connectString=connectString;this.sessionTimeout=sessionTimeout;this.LOCK_ROOT_PATH=锁名;}publicvoidinit()抛出IOException,保持erException,InterruptedException{//建联zk=newZooKeeper(connectString,sessionTimeout,watchedEvent->{//连接到zk后释放connectLatchif(watchedEvent.getState()==Watcher.Event.KeeperState.SyncConnected){connectLatch.countDown();}});connectLatch.await();//等待zk正常连接//判断锁名节点是否存在Statstat=zk.exists(LOCK_ROOT_PATH,false);if(stat==null){//创建一个锁名节点}catch(KeeperExceptione){//忽略并发创建冲突if(!e.code().name().equals("NODEEXISTS")){throwe;}}}}/***新增功能:*1.超时设置*2.读写区分*3.重入控制*/publicvoidzklock()throwsKeeperException,InterruptedException{if(!tryLock()){等待锁();zklock();}}/****/privatevoidwaitLock()throwsKeeperException,InterruptedException{try{zk.getData(waitPath,newWatcher(){@Overridepublicvoidprocess(WatchedEventwatchedEvent){//waitLatch需要被释放if(watchedEvent.getType()==Watcher.Event.EventType.NodeDeleted&&watchedEvent.getPath().equals(waitPath)){waitLatch.countDown();}}},newStat());//等待监听waitLatch.await();}h(KeeperException.NoNodeExceptione){//如果等待节点已经被清除,不等待,尝试抢锁return;}}privatebooleantryLock()throwsKeeperException,InterruptedException{currentNode=zk.create(LOCK_ROOT_PATH+"/"+NODE_PREFIX,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是否是序号最小的节点,如果获取到锁;如果不是,则听上一个节点Listchildren=zk.getChildren(LOCK_ROOT_PATH,false);//如果children只有一个值,则直接获取锁;如果有多个节点,需要判断谁最小if(children.size()==1){returntrue;}else{StringthisNode=currentNode.substring(LOCK_ROOT_PATH.length()+1);//通过w00000000获取节点在children集合中的位置intindex=children.indexOf(thisNode);if(index==0){//我是第一个节点returntrue;}//需要监听他前一个节点的变化waitPath=LOCK_ROOT_PATH+"/"+children.get(索引-1);}返回假;}//解锁publicvoidunZkLock(){//删除节点try{zk.delete(this.currentNode,-1);}catch(InterruptedExceptione){e.printStackTrace();}赶上(KeeperExceptione){e。打印堆栈跟踪();}}}