图片来自Pexels。但是目前越来越多的互联网项目都是集群部署,也就是分布式的情况。这两种锁是不够的。这里举两个例子来说明,在本地锁的情况下:在分布式锁的情况下:从它的思维上来说,是一种“我都要”的思维,所有的服务都去一个统一的地方去获取锁,只有获得锁的人才可以继续执行。说完思路,再说说具体的实现。Redis是用来实现分布式锁的。Redis中有一个SETNXkeyvalue命令,意思是setifnotexists(如果key不存在,则设置value)。比如张三去厕所,看了看厕所门锁,然后就没有进去,等厕所门开着他才进去。可以看到,第一次set返回1,表示成功,而第二次返回0,表示set失败,因为key已经存在。当然,是否可以仅依赖setnx命令?当然不是。想象一个场景,张三在厕所里,但他在里面一直没有放出来,一直蹲在里面。然后所有外人都不能上厕所。想揍死他。Redis也是一样,假设已经加锁了,但是由于宕机或者异常导致锁没有释放,导致所谓的“死锁”。聪明人一定早就想到了。给它设置过期时间不好。您可以使用SETEXkeysecondsvalue命令来设置指定密钥的过期时间(以秒为单位)。但是这种方式还有一个问题。刚刚加锁成功,还没有设置过期时间。如果Redis宕机,它会再次死锁。所以要保证原子性,要么一起成功,要么一起失败。当然,我们能想到的Redis肯定早就帮你实现过了。在Redis2.8版本之后,Redis为我们提供了组合命令SETkeyvalueexsecondsnx,在加锁的同时设置过期时间。就好比公司规定不管放行有没有放行,每个人最多只能在厕所呆2分钟,这样就解决了“僵局”的问题。但是这样就没有问题了吗?这怎么可能。想象另一种情况,厕所门只能从里面打开。张三上完厕所,张四进去把门锁上,外面的人却以为张三还在里面,而且已经三分钟了,就直接开了门。撬开一看,里面是张四,很是尴尬。切换到Redis是指比如一个业务执行时间很长,锁自己过期了,别人又设置了新的锁,但是业务执行完直接释放锁,别人加的锁可能会被删除。它搞砸了吗?所以,在加锁的时候,设置一个随机值,在删除锁的时候进行比较。如果是您自己的锁,请将其删除。多说没用,烦人,直接上代码://基于jedis和lua脚本实现{try获取锁的超时时间,超过这个时间会放弃获取锁longend=System.currentTimeMillis()+acquireTimeout;//随机生成一个valueStringrequireToken=UUID.randomUUID().toString();while(System.currentTimeMillis()org.redissonredisson3.13.4先来一段Redisson的加锁代码:privatevoidtest(){//分布式锁名lock的粒度越细,性能越好RLocklock=redissonClient.getLock("test_lock");lock.lock();try{//具体商业。.....}finally{lock.unlock();}}就是这么简单,jdk的ReentrantLock的使用方法类似,同样支持ReadWriteLock(读写锁),ReentrantLock(可重入锁),FairLock(fairlock)、RedLock(红锁)等锁,具体可以参考redisson官方文档。那么Redisson有哪些优势呢?锁的自动更新(默认为30秒)。如果业务时间过长,会在运行过程中自动续锁新的30s。不用担心业务执行时间长,锁会自动删除丢失。只要锁定的业务完成,就不会续订。即使不手动解锁,默认也会在30秒后删除锁,不会造成死锁问题。前面也提到了锁的自动续订。我们来看看Redisson是如何实现的。先说明一下,这里主要说一下Redisson中的RLock,即可重入锁,实现方式有两种://最常用的使用方法lock.lock();//加锁10秒后自动解锁//无需调用unlock方法手动解锁lock.lock(10,TimeUnit.SECONDS);只有不带参数的方法才提供了锁的自动更新操作。“看门狗”机制在内部使用。我们来看看源代码。不管是空参数还是带参数的方法,都是调用同一个锁方法。如果不传参数,时间传-1,有参数的方法传实际传的时间。继续点击scheduleExpirationRenewal方法:点击renewExpiration方法:综上所述,当我们指定了锁的过期时间,时间到了就会自动释放锁。如果不指定锁过期时间,看门狗默认时间为30s。只要锁被成功占用,就会启动一个定时任务,每隔10s会为锁设置一个新的过期时间。该时间是看门狗的默认时间,直到锁被释放。总结:虽然lock()有自动续锁机制,但是在开发中还是推荐使用lock(time,timeUnit),因为这样可以省去整个续锁带来的性能损失,并且可以设置过期时间长一些,并使用unlock()。如果业务执行完成,会手动释放锁。如果业务执行超时,那么一般我们的服务也会设置业务超时时间,直接报错。报错后,经过设定的过期时间,锁会被释放。publicvoidtest(){RLocklock=redissonClient.getLock("test_lock");lock.lock(30,TimeUnit.SECONDS);try{//...具体业务}finally{//手动释放锁lock.unlock();}}基于Zookeeper实现分布式锁很多朋友都知道,在分布式系统中,ZK可以作为注册中心,但其实ZK除了作为祖先注册中心外,作为一个注册中心也是非常有用的分布式锁。一个常见的方案。我们先来看看ZK中如何创建节点?ZK中有一个create[-s][-e]path[data]命令,-s是创建有序节点,-e是创建临时节点。这样就为父节点创建了一个父节点和一个子节点,组合命令的意思是创建一个临时有序的节点。ZK中的分布式锁主要是通过创建临时时序节点来实现的。至于为什么使用顺序节点,为什么使用临时节点而不是持久节点?先想一想,下面会有说明。同时,如何查看ZK中的节点?ZK中的ls[-w]path是查看节点的命令,-w是添加watch(监视器),/是查看根节点的所有节点,可以看到我们刚刚创建的节点,如果跟指定节点名,查看指定节点下的子节点。下面的00000000是ZK对顺序节点添加的顺序。注册监听器也是ZK实现分布式锁的一个重要的事情。先看一下ZK实现分布式锁的主要流程:当第一个线程进来的时候,会去父节点创建一个临时的顺序节点。当第二个线程进来发现已经持有锁时,它会为当前持有锁的节点注册一个watcher监听器。第三个线程进来,发现锁已经被持有了。因为是顺序节点,所以会为前一个节点创建一个watcher监听器。当第一个线程释放锁时,删除该节点,其下一个节点将占用锁。看到这里,聪明的朋友已经看出了时序节点的好处。对于非顺序节点,每次有线程进来,都会在持有锁的节点上注册一个监听器,容易造成“羊群效应”。这么一大群羊朝你扑来,不管你能不能忍受,反正ZK服务器会增加宕机的风险。但是对于顺序节点,它不会。当一个顺序节点发现一个线程已经持有锁时,它会向它的前一个节点注册一个监听器,这样当持有锁的节点被释放时,只有下一个持有锁的线程节点才能抢到锁,相当于排队执行,减少服务器停机的风险。至于为什么要使用临时节点,跟Redis的过期时间是一样的。即使ZK服务器宕机,临时节点也会随着服务器宕机而消失,避免出现死锁情况。下面来上一段代码的实现:publicclassZooKeeperDistributedLockimplementsWatcher{privateZooKeeperzk;privateStringlocksRoot="/locks";privateStringproductId;privateStringwaitNode;privateStringlockNode;privateCountDownLatchlatch;privateCountDownLatchconnectedLatch=newCountDownLatch(1);privateintsessionTimeout=30000;publicZooKeeperDistributedLock(StringproductId){this.productId=productId;try{Stringaddress="192.168.189.131:2181,192.168.189.132:2181";zk=newZooKeeper(address,sessionTimeout,this);connectedLatch.await();}catch(IOExceptione){thrownewLockException(e);}catch(KeeperExceptione){thrownewLockException(e);}catch(InterruptedExceptione){thrownewLockException(e);}}publicvoidprocess(WatchedEventevent){if(event.getState()==KeeperState.SyncConnected){connectedLatch.countDown();return;}if(this.latch!=null){this.latch.countDown();}}publicvoidacquireDistributedLock(){try{if(this.tryLock()){return;}else{waitForLock(waitNode,sessionTimeout);}}catch(保持erExceptione){thrownewLockException(e);}catch(InterruptedExceptione){thrownewLockException(e);}}//获取锁publicbooleantryLock(){try{//传入locksRoot+"/"+productId//假设productId代表一个Productid,例如1//locksRoot=locks///locks/10000000000,/locks/10000000001,/locks/10000000002lockNode=zk.create(locksRoot+"/"+productId,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//检查新创建的节点是否为最小节点//locks:10000000000,10000000001,10000000002Listlocks=zk.getChildren(locksRoot,false);Collections.sort(locks);if(lockNode.equals(locksRoot+"/"+locks.get(0))){//如果是最小节点,表示拿到锁returntrue;}//如果不是最小节点,找比自己小的节点通过1intpreviousLockIndex=-1;for(inti=0;i