当前位置: 首页 > 科技观察

又长又瘦,万字长文带你解读Redisson分布式锁源码_0

时间:2023-03-17 16:14:35 科技观察

前言上一篇写了Redis分布式锁的原理和缺陷,具体原理我还没说呢。过年后暂时无事可做。反正我是闲的。还是学习Redisson的源码比较好。虽然是心血来潮,但仔细研究后发现,Redisson的源码解释工作量还是挺大的,它使用了大量的Java并发类,使用Netty作为通信工具,实现与Redis组件的远程调用。如果把这些知识点全部讲解完是不太现实的。这篇文章的重点主要是Redisson分布式锁的实现原理,所以网络通信和并发原理的代码解读就不会太细心了。不足之处敬请见谅!在Redis发布和订阅之前说过,分布式锁其实有3个核心功能:加锁、解锁和设置锁超时时间。这三个函数也是我们研究Redisson分布式锁原理的方向。在学习之前,我们需要了解一个知识点,就是Redis的发布订阅功能。Redis发布-订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息,发布者可以向指定的通道(channel)发送消息,如果subscriber订阅了一个channel,就可以接收消息,从而实现多客户端的通讯效果。订阅命令为SUBSCRIBEchannel[channel...],可以订阅一个或多个频道。当通过PUBLISH命令向通道发送新消息时,订阅者可以接收到消息,就像这样:打开两个客户端,一个订阅通道channel1,另一个通过PUBLISH发送消息,订阅的那个可以收到。这种模式可以实现不同客户端之间的通信。这种通信方式的神奇应用场景我们就不展开了。大家可以自己去网上查一下,学习一下。我们的主角依然是Redisson。热身后,就该上主菜了。在使用Redisson锁定Redisson源码前,需要先获取一个RLock实例对象。有了这个对象,就可以调用lock和tryLock方法来完成加锁功能。Configconfig=newConfig();config.useSingleServer().setPassword("").setAddress("redis://127.0.0.1:6379");RedissonClientredisson=Redisson.create(config);//RLock对象RLocklock=redisson.getLock("我的锁");配置对应的host,然后就可以创建RLock对象了。RLock是一个接口,具体的synchronizer需要实现这个接口。当我们调用redisson.getLock()时,程序会初始化一个默认的同步执行器RedissonLock,它会初始化几个参数,commandExecutor:异步Executor执行器,Redisson中的所有命令都是通过...Executor执行的;id:唯一ID,初始化时用UUID创建;internalLockLeaseTime:等待获取锁的时间,这里读取的是配置类中的默认定义,时间为30秒;同时,我在图中也标注了一个方法getEntryName,返回的是一串“ID:锁名”,表示当前持有对应锁的线程的标识。这些参数必须留下印象。它经常出现在源代码分析中。说完初始化,我们就可以开始学习加锁和解锁的源码了。Redisson有两种加锁方式,tryLock和lock。使用上的区别是tryLock可以设置锁的过期时间leaseTime和waitTime。核心处理逻辑类似。让我们从tryLock开始。tryLock代码有点长。..不方便补图,直接贴上去,/***@paramwaitTime等待锁的时间*@paramleaseTime锁持有时间*@paramunit时间单位*@return*@throwsInterruptedException*/publicbooleantryLock(longwaitTime,longleaseTime,TimeUnitunit)throwsInterruptedException{//等待锁的剩余时间longtime=unit.toMillis(waitTime);longcurrent=System.currentTimeMillis();finallongthreadId=Thread.currentThread().getId();//尝试获取锁,如果没有加锁,返回锁的剩余超时时间longttl=tryAcquire(leaseTime,unit,threadId);//ttl为null,表示可以抢到锁,returntrueif(ttl==null){returntrue;}//如果waitTime已经超时,则返回false,表示申请锁失败time-=(System.currentTimeMillis()-current);if(time<=0){acquireFailed(threadId);returnfalse;}current=System.currentTimeMillis();//订阅分布式锁,解锁时通知。看,这里我们使用了我们上面提到的发布订阅。finalRFuturesubscribeFuture=subscribe(threadId);//阻塞等待锁释放,await()返回false,表示等待超时if(!await(subscribeFuture,time,TimeUnit.MILLISECONDS)){if(!subscribeFuture.cancel(false)){subscribeFuture.addListener(newFutureListener(){@OverridepublicvoidoperationComplete(Futurefuture)throwsException{if(subscribeFuture.isSuccess()){//等待超时,取消订阅直接unsubscribe(subscribeFuture,threadId);}}});}acquireFailed(threadId);returnfalse;}try{time-=(System.currentTimeMillis()-current);if(time<=0){acquireFailed(threadId);returnfalse;}//进入死循环,反复调用tryAcquire尝试获取锁,和上段获取锁的逻辑一样while(true){longcurrentTime=System.currentTimeMillis();ttl=tryAcquire(leaseTime,unit,threadId);//lockacquiredif(ttl==null){returntrue;}time-=(System.currentTimeMillis()-currentTime);if(time<=0){acquireFailed(threadId);returnfalse;}//waitingformessagecurrentTime=System.currentTimeMillis();if(ttl>=0&&ttlttlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg())){return;}LongttlRemaining=future.getNow();//lockacquiredif(ttlRemaining==null){scheduleExpirationRenewal(threadId);}}});返回ttlRemainingFuture;}下面我们继续关注并查看tryLockInnerAsync方法的源码:.evalWriteAsync(getName(),LongCodec.INSTANCE,command,"if(redis.call('exists',KEYS[1])==0)then"+"redis.call('hset',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnil;"+"end;"+"returnredis.call('pttl',KEYS[1]);",Collections.singletonList(getName()),internalLockLeaseTime,getLockName(threadId));}StringgetLockName(longthreadId){returnid+":"+threadId;}这里是底层调用栈,直接操作命令,集成成lua脚本,调用netty与redis通信的工具类,从而实现的功能获取锁这个脚本命令比较有意思,简单说明一下:先用existskey命令判断锁是否被占用,如果没有,用hset命令写入,key是锁的名字,字段是"客户端唯一ID:线程ID”,值为1;锁被占用,判断是否被当前线程占用,如果是,则对该值加1;如果锁没有被当前线程占用,则返回锁的剩余过期时间;命令的逻辑并不复杂,但是不得不说作者的设计还是很用心的。Redis的Hash结构是用来存储数据的。如果发现当前线程已经持有锁,则使用hincrby命令将该值加1。该值的值将决定何时释放锁。调用unlock命令的次数,实现锁的重入效果。我在下图中标注了每一步命令对应的逻辑。你可以阅读它:让我们继续代码。根据上面的命令,我们可以看到如果线程拿到了锁,tryLock方法会直接返回true,就万事大吉了。.如果拿不到,会返回锁的剩余过期时间。这个持续时间有什么影响?我们回到tryLock方法中的死循环:这里是比较waitTime和key的剩余过期时间,得到两者中较小的值,然后使用Java的Semaphore信号量的tryAcquire方法来阻塞线。那么Semaphore信号量由谁控制,什么时候可以释放。这里我们需要再次回到上面。读者应该还记得我们上面贴出的tryLock代码中还有这么一段:current=System.currentTimeMillis();//订阅分布式锁,解锁时通知finalRFuturesubscribeFuture=subscribe(threadId);订阅逻辑显然是在subscribe方法中,顺着方法的调用链,会进入PublishSubscribe.Java:这段代码的作用是将当前线程的threadId添加到一个AsyncSemaphore中,并设置一个redis监听器,这是通过redis的发布和订阅功能实现的。监听器一旦收到来自redis的消息,就会从中获取当前线程的相关信息。如果是释放锁的消息,会立即通过操作Semaphore(即调用release方法)释放阻塞的地方。释放后线程继续执行,还是判断是否超时。如果还没有超时,进入下一个循环再次获取锁,获取到就返回true,没有获取到就继续这个过程。这里解释一下,循环的原因是锁可能同时被多个client打乱。线程被阻塞释放后的那一瞬间,锁可能仍然不可用,但线程的等待时间还没有过去。这时候就需要再次运行循环获取锁。这就是tryLock获取锁的全过程。画个流程图的话,大致是这样的:除了tryLock,我们经常直接调用lock来获取锁。获取锁的过程与tryLock基本相同。不同的是lock不会手动设置锁过期时间这个参数。该方法的调用链同样运行到tryAcquire方法获取锁。不同的是会跑到这部分逻辑:这段代码做了两件事:1.设置一个30秒的过期时间,然后获取锁。2.打开监听器。如果发现已经拿到锁,则启动定时任务,不断刷新锁的过期时间。刷新过期时间的方式是scheduleExpirationRenewal。贴出源码:privatevoidscheduleExpirationRenewal(finallongthreadId){//expirationRenewalMap是一个ConcurrentMap,存储标记为“当前线程ID:键名”的任务if(expirationRenewalMap.containsKey(getEntryName())){return;}Timeouttask=commandExecutor.getConnectionManager().newTimeout(newTimerTask(){@Overridepublicvoidrun(Timeouttimeout)throwsException{//Lua脚本检测锁是否存在,如果存在则使用pexpire命令刷新过期时间RFuturefuture=commandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"return1;"+"end;"+"return0;",Collections.singletonList(getName()),internalLockLeaseTime,getLockName(threadId));future.addListener(newFutureListener(){@OverridepublicvoidoperationComplete(Futurefuture)throwsException{expirationRenewalMap.remove(getEntryName());if(!future.isSuccess()){log.error("Can'tupdatelock"+getName()+"expiration",future.cause());return;}if(future.getNow()){//rescheduleitselfscheduleExpirationRenewal(threadId);}}});}},internalLockLeaseTime/3,TimeUnit。MILLISECONDS);if(expirationRenewalMap.putIfAbsent(getEntryName(),task)!=null){task.cancel();}}代码的过程比较简单,大概是开启一个定时任务,每次internalLockLeaseTime/3(这个时间是10秒)检查当前线程是否还持有锁,如果是,则重新设置过期时间internalLockLeaseTime,为30秒,这些定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap中,存储的key为“线程ID:键名”。如果发现expirationRenewalMap中不存在当前线程key,则定时任务不会运行。这也是后续解锁的重要一步。上面的代码就是Redisson中所谓的“看门狗”程序,它使用异步线程定时检测并执行,以防在手动解锁前过期。其他逻辑与tryLock()基本相同。可以看到有解锁的方法,自然有unlock。Redisson分布式锁解锁的上层调用方法是unlock(),默认不需要参数@Overridepublicvoidunlock(){//发起释放锁的命令请求BooleanopStatus=get(unlockInnerAsync(Thread.currentThread().获取ID()));if(opStatus==null){thrownewIllegalMonitorStateException("attempttounlock,notlockedbycurrentthreadbynodeid:"+id+"thread-id:"+Thread.currentThread().getId());}if(opStatus){//成功释放锁,取消"看门狗的延续线程cancelExpirationRenewal();}}解锁相关的命令操作定义在unlockInnerAsync方法中,里面有一大串lua脚本,比之前的锁脚本命令稍微复杂一点,但是没关系,简单梳理一下,命令逻辑大致如下:1.判断锁是否存在,如果不存在,使用publish命令发布释放锁的消息,收到后it,订阅者可以做下一步的锁处理;2.锁存在但当前线程没有持有,返回空nil;3.当前线程持有锁,使用hincrby命令设置重入次数的锁-1,然后判断是否是num重入次数的ber大于0,如果是,则刷新锁过期时间,返回0,否则删除锁,释放锁释放消息,返回1;当线程完全释放锁时,会调用cancelExpirationRenewal()方法取消“看门狗”延续线程executedTimeouttask=expirationRenewalMap.remove(getEntryName());if(task!=null){task.cancel();}}这就是释放锁的过程。这个怎么样?是不是比较简单?阅读起来比锁定它的代码要舒服得多。当然,我还是不厌其烦的给大家画了一张流程图,给大家展示加锁的过程(就这点,要不要给我来个三合一,哈哈):上面的RedLock就是Redisson分布式的原理锁。总的来说就是简单的使用lua脚本集成基本的set命令来实现锁的功能。这也是很多Redis分布式锁工具的设计原则。此外,Redisson还支持“RedLock算法”来实现锁的效果。这个工具类就是RedissonRedLock。用法也很简单,创建多个RedissonNode,这些不相关的Node可以组成一个完整的分布式锁RLocklock1=Redisson.create(config1).getLock(lockKey);RLocklock2=Redisson.create(config2).getLock(lockKey);RLocklock3=Redisson.create(config3).getLock(lockKey);RedissonRedLockredLock=newRedissonRedLock(lock1,lock2,lock3);try{redLock.lock();}finally{redLock.unlock();}RedLock算法原理我不会详细了解它。有兴趣的可以看看我之前的文章,或者上网搜索一下。简单来说,在一定程度上可以有效防止Redis实例的单点故障,但也不是完全可靠。无论设计如何,仅靠Redis是无法保证锁的强一致性的。同样,鱼和熊掌不可兼得,性能和安全性往往是一样的。Redis强大的性能和易用性足以满足日常的分布式锁需求。如果业务场景无法承受锁的安全风险,最有保障的方式就是在业务层做幂等处理。综上所述,看完本文的源码分析,相信你对Redisson分布式锁的设计有了足够的了解。当然,虽然我们是在讲解源码,但是我们主要的关注点还是在分布式锁的原理上,一些无关的过程,有兴趣的可以自行阅读。源码中很多地方展示了一些基本的并发工具和网络通信的神奇之处。学习是很有收获的。最后还是要吐槽一下,Redisson的评论真的很少。.....