什么是分布式锁?分布式锁是一种控制分布式系统之间共享资源同步访问的方法。在分布式系统中,经常需要协调它们的动作。图片来自Pexels。如果不同系统或同一系统的不同主机共享一个或一组资源,在访问这些资源时往往需要互斥,以防止相互干扰,保证一致性。在这种情况下,你需要使用分布式锁。为什么要使用分布式锁来保证一个方法或者属性在高并发下只能同时被同一个线程执行。在传统单体应用的单机部署情况下,可以使用Java并发相关的API(如ReentrantLock或Synchronized)进行互斥控制;在单机环境下,Java提供了许多与并发相关的API。但是随着业务发展的需要,原来的单机部署系统演变为分布式集群系统后,由于分布式系统是多线程、多进程、分布在不同机器上的,这就使得原来的单机部署系统变成了分布式集群系统。单独部署情况concurrent控制锁策略失效,纯JavaAPI无法提供分发锁的能力。为了解决这个问题,需要一种跨JVM的互斥机制来控制对共享资源的访问。这就是分布式锁要解决的问题!例如:机器A和机器B是一个集群。A、B两台机器上的程序是一样的,都是高可用的。A和B机器都有一个定时任务,需要在每晚凌晨2点执行,但是这个定时任务只能执行一次,否则会报错。A、B两台机器在执行的时候,需要去抢锁。谁抢到锁谁就执行,谁抢不到就不用执行!锁处理锁处理方法如下:锁在单个应用中使用:(单进程多线程)Synchronize。分布式锁控制一种在分布式系统之间同步访问资源的方式。分布式锁是一种控制分布式系统之间共享资源同步访问的方法。分布式锁的实现分布式锁的实现如下:基于数据的乐观锁实现分布式锁基于Zookeeper的临时节点分布式锁基于Redis的分布式锁Redis分布式锁锁的获取在set命令中,有很多选项可用于修改命令的行为。以下是set命令可用选项的基本语法:redis127.0.0.1:6379>SETKEYVALUE[EXseconds][PXmilliseconds][NX|XX]-EXseconds设置指定过期时间(单位为秒)-PXmilliseconds设置指定的过期时间(毫秒)-NX:只在key不存在时设置-XX:只在key已经存在时设置方法一:推荐privatestaticfinalStringLOCK_SUCCESS="OK";privatestaticfinalStringSET_IF_NOT_EXIST="NX";privatestaticfinalStringSET_WITH_EXPIRE_TIME="PX";publicstaticbooleangetLock(JedisClusterjedisCluster,StringlockKey,StringrequestId,intexpireTime){//NX:保证互斥Stringresult=jedisCluster.set(lockKey,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireCK_SUCC);if()){returntrue;}returnfalse;}方法二:publicstaticbooleangetLock(StringlockKey,StringrequestId,intexpireTime){Longresult=jedis.setnx(lockKey,requestId);if(result==1){jedis.expire(lockKey,expireTime);returntrue;}returnfalse;}注:Meth推荐使用od1,因为setnx和expire是方法2中的两个操作,它不是原子操作。如果setnx有问题,就是死锁情况,所以推荐方法一释放锁方法一:del命令实现publicstaticvoidreleaseLock(StringlockKey,StringrequestId){if(requestId.equals(jedis.get(lockKey))){jedis.del(lockKey);}}方法二:Redis+Lua脚本实现(推荐)publicstaticbooleanreleaseLock(StringlockKey,StringrequestId){Stringscript="ifredis.call('get',KEYS[1])==ARGV[1]然后返回redis.call('del',KEYS[1])elsereturn0end";Objectresult=jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));if(result.equals(1L)){returntrue;}returnfalse;}Zookeeper的分布式锁Zookeeper的分布式锁实现原理了解了锁的原理后,你会发现Zookeeper诞生为分布式锁的雏形。首先,Zookeeper的每个节点都是一个天然的定序器。在每个节点下创建子节点时,只要选择的创建类型是有序的(EPHEMERAL_SEQUENTIAL临时有序或PERSISTENT_SEQUENTIAL永久有序),那么在新的子节点后面会加上一个序号。这个序号是之前生成的序号加1。比如创建一个发号节点“/test/lock”,然后将其作为父节点,在这个父节点下创建一个相同前缀的子节点。假设相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指定有序类型。如果是第一个创建的子节点,则生成的子节点为/test/lock/seq-0000000000,下一个节点为/test/lock/seq-0000000001,以此类推。其次,Zookeeper节点的增量可以规定节点号最小的获取锁。一个Zookeeper分布式锁首先需要创建一个父节点,尝试做一个持久化节点(PERSISTENT类型),然后每个想要获取锁的线程都会在这个节点下创建一个临时序列节点。由于序号的递增性,可以指定序号最小的获得锁。因此,每个线程在尝试占用锁之前,先判断自己的队列号当前是否是最小的,如果是,则获取锁。第三,Zookeeper的节点监控机制可以保证锁的持有有序高效。每个线程在获取锁之前,首先创建自己的ZNode。同理,当释放锁时,需要删除抢号的Znode。抢号成功后,如果不是号码最小的节点,则进入等待通知状态。谁应该等待通知?不需要其他人,只需要等待上一个Znode的通知即可。当之前的Znode被删除后,就轮到自己拥有锁了。第一个通知第二个,第二个通知第三个,鼓声一个接一个向后传。Zookeeper的节点监控机制可以说是非常完善的实现了这种击鼓传花的信息传递。具体做法是,每个等待通知的Znode节点只需要监听linsten或者watch来监听排在自己前面且紧挨在自己前面的节点即可。只要把前面的节点删除了,再判断一下,看你是不是序号最小的节点,如果是,就获取锁。为什么说Zookeeper的节点监控机制很完善呢?一站式端到端连接,后端监控前端,不怕中间截断吗?比如在分布式环境下,由于网络原因,或者服务器挂了,或者其他原因,如果前面的节点没有被程序删除成功,后面的节点不就一直等待下去吗?其实Zookeeper的内部机制可以保证后续节点可以正常监听删除并获得锁。在创建取号节点时,尽量创建一个临时的Znode节点,而不是永久的Znode节点。一旦这个Znode的客户端与Zookeeper集群服务器失去联系,这个临时Znode也会被自动删除。它后面的节点也可以接收到删除事件,从而获得锁。据说Zookeeper的节点监控机制很完善。还有一个原因。Zookeeper端到端的方式,后面跟着前面的监听,可以避免羊群效应。所谓羊群效应就是每个节点都挂了,所有节点都监听响应,会给服务器带来巨大的压力,所以出现了临时顺序节点。当一个节点挂掉时,只有它后面的节点响应。Zookeeper分布式锁实现示例Zookeeper通过临时节点实现分布式锁:interruptedExceptione){e.printStackTrace();}System.out.println("*********业务方法结束*************\n");}//这里使用@Test会报错publicstaticvoidmain(String[]args){//定义重试端策略1000等待时间(毫秒)10重试次数RetryPolicypolicy=newExponentialBackoffRetry(1000,10);//定义zookeeper客户端CuratorFrameworkclient=CuratorFrameworkFactory.builder().connectString("10.231.128.95:2181,10.231.128.96:2181,10.231.128.97).repolicytry"y"y":218).build();//启动客户端client.start();//在zookeeper中定义一个锁finalInterProcessMutexlock=newInterProcessMutex(client,"/mylock");//启动一个线程for(inti=0;i<10;i++){newThread(newRunnable(){@Overridepublicvoidrun(){try{//请求的锁lock.acquire();printNumber();}catch(Exceptione){e.printStackTrace();}finally{//释放锁try{lock.release();}catch(除了ione){e.printStackTrace();}}}}).start();}}}基于数据的分布式锁我们在讨论分布式锁的使用在使用数据库的时候,往往先排除基于数据库的方案,直觉上觉得这个方案不够“高级”。从性能的角度来看,基于数据库的解决方案的性能确实不够好。整体性能比较:缓存>Zookeeper,etcd>数据库。也有人认为基于数据库的解决方案存在很多问题并且不是很可靠。数据库方案可能不适合频繁的写操作。我们来看一下基于数据库(MySQL)的解决方案,大体上分为三类:基于表记录的乐观锁和基于表记录的悲观锁。表中数据实现。当我们要获取锁时,可以在表中增加一条记录,当我们要释放锁时,删除这条记录。为了更好的演示,我们先创建一个数据库表,如下:CREATETABLE`database_lock`(`id`BIGINTNOTNULLAUTO_INCREMENT,`resource`intNOTNULLCOMMENT'lockedresource',`description`varchar(1024)NOTNULLDEFAULT""COMMENT'description',PRIMARYKEY(`id`),UNIQUEKEY`uiq_idx_resource`(`resource`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4COMMENT='数据库分布式锁表';①我们可以插入一条数据来获取锁:INSERTINTOdatabase_lock(resource,description)VALUES(1,'lock');因为表database_lock中的resource是唯一索引,所以当其他请求提交到数据库时,一个会报错,插入不成功,只能插入一个。如果插入成功,我们就获得了锁。②删除锁INSERTINTOdatabase_lock(resource,description)VALUES(1,'lock');这种实现方式很简单,但是需要注意以下几点:①这种锁是没有有效期的,一旦释放锁的操作失败,会导致锁记录一直在数据库中,其他线程不能获取锁。这个缺陷也很容易解决,比如可以做一个定时任务定时清理。②这个锁的可靠性取决于数据库。建议设置备库,避免单点,进一步提高可靠性。③这种锁是非阻塞的,因为数据插入失败后会直接报错,想要获取锁需要重新操作。如果需要阻塞,可以使用for循环、while循环等,直到INSERT成功才返回。④这种锁也是不可重入的,因为同一个线程在释放锁之前不能再次获取锁,因为同一条记录已经存在于数据库中。实现可重入锁,可以在数据库中增加一些字段,比如主机信息,线程信息等,来获取锁。那么再次获取锁的时候,可以先查询数据。如果能查到当前的主机信息和线程信息,就可以直接给它分配锁。乐观锁,顾名思义,系统认为数据更新在大多数情况下不会引起冲突,只有在提交数据库更新操作时才检测到数据冲突。如果检测结果与预期数据不一致,则返回失败信息。乐观锁大多是基于数据版本(version)的记录机制来实现的。什么是数据版本号?就是给数据加上版本标识。在基于数据库表的版本解决方案中,一般是通过在数据库表中增加一个“版本”字段来实现的。读取数据时,版本号一起读出,以后更新的时候,这个版本号加1。在更新过程中,将比较版本号。如果一致,没有变化,则操作成功;如果版本号不一致,更新将失败。为了更好的理解数据库乐观锁在实际项目中的使用,这里举一个业界常说的盘点例子。电商平台都会有商品库存,当用户购买商品时,会对库存进行操作(库存减1表示一件商品已售出)。如果只有一个用户操作数据库本身,可以保证用户操作的正确性,但是在并发的情况下,会出现一些意想不到的问题。比如两个用户同时购买一个商品,那么数据库层面的实际操作应该是将库存减2。但是由于高并发情况,第一个用户完成购买,读取当前库存并执行减1操作,因为此操作尚未完全执行。第二个用户进入购买相同的产品。这时候查询到的库存可能是还没有减1的库存,导致出现脏数据【线程不安全操作】。数据库中的乐观锁也可以保证线程安全。通常,我们在代码层面这样做:selectgoods_numfromgoodswheregoods_name="小本子";updategoodssetgoods_num=goods_num-1wheregoods_name="小本子";上面的SQL是一组,一般是先查询当前的goods_num,然后goods_num减1修改库存。在并发的情况下,这个语句可能会导致一个原库存为3的商品被两个人购买,还剩下2个库存,从而导致该商品超卖。那么数据库乐观锁是如何实现的呢?首先定义一个version字段作为版本号,每次操作都会变成这样:selectgoods_num,versionfromgoodswheregoods_name="smallbook";updategoodssetgoods_num=goods_num-1,version=查询的version值增加wheregoods_name="smallbook"andversion=查询的版本;其实乐观锁也可以借助更新时间戳(updated_at)来实现,这与version字段的使用方式类似。在执行更新操作之前,获取记录的当前更新时间,并在提交更新时检查当前更新时间是否等于更新开始时获取的更新时间戳。除了通过悲观锁对数据库表中的记录进行增删改查外,我们还可以利用数据库中的锁来实现分布式锁。在查询语句后加上FORUPDATE,数据库会在查询过程中给数据库表加一个悲观锁,又称排它锁。当一条记录被悲观锁时,其他线程不能更改以添加悲观锁。悲观锁与乐观锁相反,总是假设最坏的情况,它认为在大多数情况下数据更新会发生冲突。在使用悲观锁的时候,我们需要注意锁的级别。MySQLInnoDB导致在加锁时显式指定主键(或索引)时会执行行锁(只对选中的数据),否则MySQL会执行表锁(锁住整个数据表)。在使用悲观锁时,我们必须关闭MySQL数据库的autocommit属性(参考下例),因为MySQL默认使用autocommit模式。也就是说,当你执行更新操作时,MySQL会立即提交结果。mysql>SETAUTOCOMMIT=0;QueryOK,0rowsaffected(0.00sec)这样就可以在使用FORUPDATE获取锁后执行相应的业务逻辑,执行完再使用COMMIT释放锁。我们不妨用前面的database_lock表来详细表达一下用法。假设有一个线程A需要获取锁并执行相应的操作。那么其具体步骤如下:STEP1-获取锁:SELECT*FROMdatabase_lockWHEREid=1FORUPDATE;。STEP2-执行业务逻辑。STEP3-释放锁:COMMIT。作者:凌静简介:生活中的百搭,目前在房地产公司做DevOPS相关工作,曾在大型互联网公司担任高级运维工程师,熟悉Linux运维、Python运维和维护开发、Java开发、DevOPS常用开发Components等等,个人公众号:乱闯,欢迎来撩我!
