当前位置: 首页 > 科技观察

实现一个Redis分布式锁

时间:2023-03-21 12:28:17 科技观察

前言在我们日常的开发中,难免会遇到需要加锁的情况。比如扣除产品库存,首先要从数据库中取出库存,进行库存判断,然后减去库存。这波操作显然不符合原子性。如果代码块没有加锁,很容易因为并发导致超卖的问题。如果我们的系统是单体架构,那么我们可以使用本地锁来解决问题。如果是分布式架构,就需要使用分布式锁。使用SETNX和EXPIRE命令的场景}这个方法貌似可以解决问题,但是有一定的风险,因为SETNX和EXPIRE的操作是非原子的。如果SETNX成功后出现错误,则不会执行EXPIRE,导致锁没有超时设置的死锁。针对这种情况,我们可以使用lua脚本来维护操作的原子性,保证SETNX和EXPIRE操作要么成功,要么不成功。if(redis.call('setnx',KEYS[1],ARGV[1])<1)thenreturn0;end;redis.call('expire',KEYS[1],tonumber(ARGV[2]));return1;通过这个方法,我们初步解决了竞争锁的原子性问题。其他功能虽然没有实现,但应该不会造成死锁🤪🤪🤪。Redis2.6.12及以上版本可以灵活使用SET命令SETkeyvalueNXEX30DELkeyif(set("item_1_lock",1,"NX","EX",30)){try{...logic}catch{...}finally{del("item_1_lock");}}改进后的方法在不使用lua脚本的情况下解决了SETNX和EXPIRE的原子性问题。现在让我们仔细想想。如果A拿到锁成功进入代码块执行逻辑,但是由于各种原因超时后锁自动释放。之后B成功获取到锁,进入代码块执行逻辑,但是此时如果A执行完逻辑再释放锁,那么B刚刚获取到的锁就会被释放。这就像用自己的房门钥匙打开另一间房子的门,这是不可接受的。为了解决这个问题,我们可以尝试在SET时设置一个锁标志,然后在DEL时验证当前锁是否是我们自己的锁。Stringvalue=UUID.randomUUID().toString().replaceAll("-","");if(set("item_1_lock",value,"NX","EX",30)){try{...逻辑}catch{...}finally{...lua脚本保证原子性}}if(redis.call('get',KEYS[1])==ARGV[1])thenreturnredis.call('del',KEYS[1])elsereturn0end至此,我们终于解决了竞争锁的原子性问题和误删锁的问题。但是,锁一般还需要支持重入、循环等待、超时自动更新等功能。下面我们学习使用一个非常好用的包来解决这些问题。入门RedissonRedission的锁实现了重入和超时自动续订功能。它已经为我们打包好了。我们只要根据自己的需要调用它的API,就可以轻松实现上述功能。详细功能可以查看Redisson文档。在项目中安装Redissonorg.redissonredisson3.13.2implementation'org.redisson:redisson:3.13.2'是用Maven或Gradle构建的。最新版本是3.13.2。您还可以在此处的Redisson中找到所需的版本。简单尝试RedissonClientredissonClient=Redisson.create();RLocklock=redissonClient.getLock("lock");booleanres=lock.lock();if(res){try{...logic}finally{lock.unlock();}}Redisson封装了所有的底层逻辑📦,具体实现我们无需关心,几行代码就可以使用完美的锁。下面我们简单的把源码折腾折腾🤔🤔🤔。加锁privatevoidlock(longleaseTime,TimeUnitunit,booleaninterruptibly)throwsInterruptedException{longthreadId=Thread.currentThread().getId();Longttl=tryAcquire(leaseTime,unit,threadId);if(ttl==null){return;}RFuturefuture=subscribe(threadId);if(interruptibly){commandExecutor.syncSubscriptionInterrupted(future);}else{commandExecutor.syncSubscription(future);}try{while(true){ttl=tryAcquire(leaseTime,unit,threadId);if(ttl==null){break;}if(ttl>=0){try{future.getNow().getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);}catch(InterruptedExceptione){if(interruptibly){抛出;}future.getNow().getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);}}else{if(interruptibly){future.getNow().getLatch().acquire();}else{future.getNow().getLatch().acquireUninterruptibly();}}}}finally{unsubscribe(future,threadId);}}获取锁privateRFuturetryAcquireAsync(longleaseTime,TimeUnitunit,longthreadId){if(leaseTime!=-1){returntryLockInnerAsync(leaseTime,unit,threadId,RedisCommands.EVAL_LONG);}RFuturettlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining,e)->{if(e!=null){return;}if(ttlRemaining==null){scheduleExpirationRenewal(threadId);}});returnttlRemainingFuture;RFuturetryLockInnerAsync(longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommandcommand){internalLockLeaseTime=unit.toMillis(leaseTime);returnevalWriteAsync(getName(),LongCodec.INSTANCE,command,"if(redis.call('exists',KEYS[1])==0)then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+"returnredis.call('pttl',KEYS[1]);",Collections.singletonList(getName()),internalLockLeaseTime,getLockName(threadId));}删除锁publicRFutureunlockAsync(longthreadId){RPromiseresult=newRedissonPromise();RFuturefuture=unlockInnerAsync(threadId);future.onComplete((opStatus,e)->{cancelExpirationRenewal(threadId);if(e!=null){result.tryFailure(e);return;}if(opStatus==null){IllegalMonitorStateExceptioncause=newIllegalMonitorStateException("attempttounlocklock,notlockedbycurrentthreadbynodeid:"+id+"线程-id:"+threadId);result.tryFailure(cause);return;}result.trySuccess(null);});returnresult;}protectedRFutureunlockInnerAsync(longthreadId){returnevalWriteAsync(getName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if(redis.call('hexists',KEYS[1],ARGV[3])==0)then"+"returnnil;"+"t;结束;"+"localcounter=redis.call('hincrby',KEYS[1],ARGV[3],-1);"+"if(counter>0)then"+"redis.call('pexpire',KEYS[1],ARGV[2]);"+"return0;"+"else"+"redis.call('del',KEYS[1]);"+"redis.call('publish',KEYS[2],ARGV[1]);"+"return1;"+"end;"+"returnnil;",Arrays.asList(getName(),getChannelName()),LockPubSub.UNLOCK_MESSAGE,internalLockLeaseTime,getLockName(threadId))));}总结Redis作为分布式锁解决并发问题还是有一些困难,需要注意的地方很多,要正确评估系统的规模,不能用它来完全解决并发问题为了使用某种技术,还是需要在数据库层面下功夫。