大家好,我是北君。今天就带大家深入剖析Redis分布式锁,彻底了解。场景既然要了解Redis分布式锁,那肯定有场景需要。高并行发票的问题就是一个经典案例。搭建环境准备redis服务,设置redis的键值对:setticket10准备postman、JMeter等模拟高并发请求的工具核心代码@ServicepublicclassTicketServiceImplimplementsTicketService{@AutowiredprivateStringRedisTemplatestringRedisTemplate;privateLoggerlogger=LoggerFactory.getLogger(TicketServiceImpl.class);@OverridepublicStringsellTicket(){StringticketStr=stringRedisTemplate.opsForValue().get("ticket");国际票=0;if(null!=ticketStr){ticket=Integer.parseInt(ticketStr);}if(ticket>0){intticketNew=ticket-1;stringRedisTemplate.opsForValue().set("ticket",String.valueOf(ticketNew));logger.info("当前门票库存为:"+ticketNew);}else{logger.info("手速不够,票已售罄...");}return"抢票成功...";}}分析解决上面的代码没有做任何加锁操作,在高并发的情况下,门票超卖的情况非常严重,根本无法正常使用。分析1由于我们需要加分布式锁,所以可以使用Redis中的setnx命令来模拟一把锁。redis>EXISTSjob#job不存在(整数)0redis>SETNXjob"programmer"#job设置成功(integer)1redis>SETNXjob"code-farmer"#尝试覆盖job,failure(integer)0当有线程进入当前方法中,使用setnx设置一个key。如果设置成功,则允许继续访问。如果设置失败,则无法访问该方法。当方法运行结束后,删除key,下次有线程访问时重新执行该操作。publicStringsellTicket(){Stringlock="lock";//如果成功设置值,证明该方法至今没有运行,可以进行售票booleantag=stringRedisTemplate.opsForValue().setIfAbsent(lock,"");if(!tag){//如果设置失败,证明当前方法正在执行,不允许再次执行//实际开发环境应该使用队列来完成访问操作。这里主要探讨分布式锁的问题,所以只模拟Scene//这里使用spin方法防止访问信息丢失sellTicket();return"访客过多,请稍后访问...";}StringticketStr=stringRedisTemplate.opsForValue().get("ticket");国际票=0;如果(空!=ticketStr){ticket=Integer.parseInt(ticketStr);}if(ticket>0){intticketNew=ticket-1;stringRedisTemplate.opsForValue().set("ticket",String.valueOf(ticketNew));logger.info("当前门票库存为:"+ticketNew);}else{logger.info("不够快,票已经卖完了...");}stringRedisTemplate.delete(锁);return"成功抢票...";}分析2上面的代码在程序正常运行的情况下不会造成抢票的问题,但是我们需要考虑:如果程序运行过程中系统出现Exception,导致无法删除锁,会造成死锁问题。也许有人马上会想到用try{}finally{}来删除finally中的锁。但是,如果是分布式架构,第一个服务器接收到请求并加锁。这时,第二台服务器也收到了请求。setnx命令失败,需要执行返回操作。根据finally的特点,在执行return之前,需要先执行finally中的代码,所以第二台服务器删除了锁,程序中的锁失效,肯定会出现超订票等一系列问题.如果程序在运行中完全死掉(比如程序员闲着无事,来了一个kill-9;或者断电),即使加上finally,finally也执行不了,死锁问题仍然会发生。解决方法:给锁加个标识符,只允许自己操作锁。其他接入程序不能操作锁,给锁加一个过期时间,这样即使程序死了,时间到时,仍然可以继续执行publicStringsellTicket(){Stringlock="lock";//锁定密钥StringlockId=UUID.randomUUID().toString();//锁定值:唯一标识try{//如果这个值设置成功,证明该方法当前没有使用操作,可以卖票//加一个过期时间,暂定为30秒,这里的操作是atomic,如果过期时间设置失败,key也会设置失败Booleantag=stringRedisTemplate.opsForValue().setIfAbsent(lock,lockId,30,TimeUnit.SECONDS);if(!tag){//如果设置失败,证明当前方法正在执行,不允许再次执行//实际开发环境应该使用队列来完成访问操作。这里主要探讨分配类型锁的问题,所以我只模拟了场景//如果不设置回调,访问信息会丢失sellTicket();return"访客过多,请稍后访问...";}字符串ticketStr=stringRedisTemplate.opsForValue().get("ticket");国际票=0;如果(空!=ticketStr){ticket=Integer.parseInt(ticketStr);}if(ticket>0){intticketNew=ticket-1;stringRedisTemplate.opsForValue().set("ticket",String.valueOf(ticketNew));logger.info("当前门票库存为:"+ticketNew);}else{logger.info("手速不够,票已售罄...");}}finally{//如果redis中的值与当前值一致,则允许删除锁if(lockId.equals(stringRedisTemplate.opsForValue().get(lock))){stringRedisTemplate.delete(lock);}}return"Successfulticketgrassing...";}分析3已经写到这里解决了大部分问题,但是还有一个问题需要考虑:如果程序运行速度极慢(硬件处理Slow或者GC),导致30秒到了,锁已经过期,程序还没有运行完,这时候就会有另外一个线程一直想钻空子,导致票超卖。这里我们可以用sleep来模拟一下...if(ticket>0){try{//为了测试方便,将过期时间和线程挂起时间改为3秒Thread.sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}intticketNew=ticket-1;stringRedisTemplate.opsForValue().set("ticket",String.valueOf(ticketNew));......这个操作会出现极其严重的超卖问题那么这个到期时间怎么设置呢?继续增加?这显然是不合适的,因为无论问题有多大,总是有可能出错。解决方案我们可以使用守护线程来保证这个时间永不过期publicStringsellTicket(){Stringlock="lock";//锁定密钥StringlockId=UUID.randomUUID().toString();//锁值:唯一标识MyThreadmyThread=null;//锁的守护线程try{//如果这个值设置成功,证明该方法目前还没有操作,可以进行售票操作//加一个过期时间,暂定为3秒,这里的操作是原子的。如果过期时间设置失败,则密钥也将设置失败。布尔标记=stringRedisTemplate.opsForValue().setIfAbsent(lock,lockId,3,TimeUnit.SECONDS);if(!tag){//如果设置失败,证明当前方法正在执行,不允许再次执行//实际开发环境应该使用队列来完成访问操作。这里主要探讨分布式锁的问题,所以只模拟场景//如果不设置回调,访问信息会丢失sellTicket();return"当前访问人数过多,请稍后访问...";}//打开守护线程,每隔三分之一的时间更新一次锁myThread=newMyThread(lock);myThread.setDaemon(true);我的线程.start();StringticketStr=stringRedisTemplate.opsForValue().get("ticket");国际票=0;如果(空!=ticketStr){ticket=Integer.parseInt(ticketStr);}如果(票>0){尝试{Thread.sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}intticketNew=ticket-1;stringRedisTemplate.opsForValue().set("ticket",String.valueOf(ticketNew));logger.info("当前门票库存为:"+ticketNew);}else{logger.info("手速不够,票已售罄...");}}finally{//如果redis中的值,与当前值一致,允许删除锁if(lockId.equals(stringRedisTemplate.opsForValue().get(lock))){//程序结束,需要关闭守护线程myThread.stop();stringRedisTemplate.delete(锁);logger.info("锁释放成功...");}}return"抢票成功...";}/**使用后台线程续命*守护线程*如果主线程下有守护线程,则守护线程的生命周期与主线程相同主线程同生同死*/classMyThreadextendsThread{Stringlock;MyThread(Stringlock){this.lock=lock;}@Overridepublicvoidrun(){while(true){try{//三分之一的时间Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}//假设线程还活着,需要更新锁logger.info("Threadrenewing...");stringRedisTemplate.expire(lock,3,TimeUnit.SECONDS);}}}至此,我们基本实现了redis分布式锁,在高并发场景下可以正常运行。需要注意的是,实现分布式锁的代码肯定不是最好的。理解分布式锁的实现原理,以及发现问题和解决问题的思路很重要。
