在并发场景下,很多时候,我们的业务开发都会有加锁操作来保证执行和业务逻辑的互斥。比如Java中有很多基于AQS的组件,使用起来很方便。在创建锁的时候,也可以具体指定当前锁是否需要公平。/***使用给定的公平策略创建{@codeReentrantLock}实例。**@paramfair{@codetrue}如果这个锁应该使用公平排序策略*/publicReentrantLock(booleanfair){sync=fair?newFairSync():newNonfairSync();对于需要公平性的场景,就像我们的现实生活一样,这里的FairSync会通过一个CLH队列对请求线程进行排队。在单实例应用中,本地CLH排队就够了,我们现在切换到分布式场景。在分布式场景下,为了实现锁的功能,出现了各种分布式锁。与单实例场景下的锁相比,锁只能锁定自己的实例。由于统一外部中间件的介入,分布式锁将锁信息提取到一个独立的外部,因此多个应用实例可以互斥。在分布式场景下,如何保证公平性?和我们从单实例到分布式锁的思路是一样的。讲道理,在统一的第三方排队排队。在第三方这里排队也有一些需要注意的地方。比如判断当前队列是否有等待,如果没有,则获取锁成功,执行,如果有等待,则加入队列,判断的逻辑可能还是并发操作,也需要加锁。就好像你在某家餐厅看了一眼,没有人,兴高采烈地去买了一杯奶茶,回来的时候已经满了。对于分布式锁,使用较多的是基于Redis的Redission。如果换成Redis分布式公平锁,那基本上就只有Redission了。下面我通过两段代码和一些文字来描述Redission的公平锁的实现。简单总结一下:Redission的公平锁是通过“Redis+Lua脚本”实现的。Redission获得Redis连接后,可以通过“eval()”执行一个Lua脚本,同时传入一些key和args。因为不管同一个连接有多少个Lua逻辑,都不会出现买奶茶就吃饱的情况。这里应用了Redis的pub/sub功能,等待线程轮到它的时候会收到Redis的提醒,前提是它需要订阅相应的通知。看加锁的Lua逻辑,代码写的比较清楚,我也加了一些对应的Redis操作和参数注释。排队信息通过链表存储,每个等待线程都有一个超时时间,超时后退出队列。因此,当eval执行此操作时,它会返回一个ttlLong类型。表示过期时间。[[用于锁操作。KEYS[1]=lockNameKEYS[2]=waitQueueNameKEYS[3]=timeoutNameARGV[1]=waitTimeARGV[2]=lockNameARGV[3]=leaseTimeARGV[4]=currentTime--]]whiletruedolocalfirstThreadId2=redis.call("lindex",KEYS[2],0)iffirstThreadId2==falsethenbreakendlocaltimeout=tonumber(redis.call("zscore",KEYS[3],firstThreadId2))iftimeout<=tonumber(ARGV[4])然后redis.call("zrem",KEYS[3],firstThreadId2)redis.call("lpop",KEYS[2])elsebreakendendif(redis.call("exists",KEYS[1])==0)和((redis.call("exists",KEYS[2])==0)或(redis.call("lindex",KEYS[2],0)==ARGV[2]))thenredis.call("lpop",KEYS[2])redis.call("zrem",KEYS[3],ARGV[2])--从有序集合中移除一个或多个redis成员。call("hset",KEYS[1],ARGV[2],1)--将哈希表key中字段field的值设置为value。redis.call("pexpire",KEYS[1],ARGV[1])returnnilendif(redis.call("hexists",KEYS[1],ARGV[2])==1)then--HEXISTS键字段视图哈希表键中是否存在指定字段。redis.call("hincrby",KEYS[1],ARGV[2],1)--HINCRBYkeyfieldincrement将增量添加到哈希表键中指定字段的整数值。redis.call("pexpire",KEYS[1],ARGV[1])--RedisPEXPIRE命令类似于EXPIRE命令,但它以毫秒而不是秒为单位设置密钥的生命周期。返回nilendlocalfirstThreadId=redis.call("lindex",KEYS[2],0)localttliffirstThreadId~=falseandfirstThreadId~=ARGV[2]然后ttl=tonumber(redis.call("zscore",KEYS[3],firstThreadId))-tonumber(ARGV[4])elsettl=redis.call("pttl",KEYS[1])--RedisPttl命令以毫秒为单位返回key的剩余过期时间。endlocaltimeout=ttl+tonumber(ARGV[3])+tonumber(ARGV[4])ifredis.call("zadd",KEYS[3],timeout,ARGV[2])==1thenredis.call("rpush",KEYS[2],ARGV[2])endreturnttl我们再来看一下解锁的逻辑。看看我之前锁定的一些内容。重点是“发布”。当轮到线程时,通道nextThreadId将收到通知。这里的threadId就是我们加锁和解锁时需要传入的。如果你关注Java的线程Id,你会发现不同实例之间有很高的重复率。为了避免这种情况,每个client在传入ThradId的时候,除了真实的id之外,还需要加上每个client对应的信息来区分。--[[用于解锁操作。KEYS[1]=lockNameKEYS[2]=waitQueueNameKEYS[3]=timeoutNameKEYS[4]=channelNameARGV[1]=messageARGV[2]=leaseTimeARGV[3]=lockNameARGV[3]=currentTime--]]whiletruedolocalfirstThreadId2=redis.call("lindex",KEYS[2],0)iffirstThreadId2==falsethenbreakendlocaltimeout=tonumber(redis.call("zscore",KEYS[3],firstThreadId2))如果超时<=tonumber(ARGV[4])那么redis.call("zrem",KEYS[3],firstThreadId2)redis.call("lpop",KEYS[2])elsebreakendendif(redis.call(“存在”,KEYS[1])==0)thenlocalnextThreadId=redis.call("lindex",KEYS[2],0)ifnextThreadId~=falsethenredis.call("publish",KEYS[4]..":"..nextThreadId,ARGV[1])endreturn1endif(redis.call("hexists",KEYS[1],ARGV[3])==0)thenreturnnilendlocalcounter=redis.call("hincrby",KEYS[1],ARGV[3],-1)if(counter>0)thenredis.call("pexpire",KEYS[1],ARGV[2])返回0endredis.call("del",KEYS[1])localnextThreadId=redis.call("lindex",KEYS[2],0)ifnextThreadId~=falsethenredis.call("publish",KEYS[4]..":"..nextThreadId,ARGV[1])endreturn1看完解锁逻辑,eval在外面进行加锁的??时候需要有对应的sub你这里会收到解锁的pub信息,否则会卡在这里。注意sub函数是阻塞操作,需要在单独的线程中执行。一个Future用于实现sub函数一定的等待时间,以及然后超时后unsub,这个redission包里有很多blocklogic,有兴趣的可以去看源码。
