当前位置: 首页 > 后端技术 > PHP

使用分布式锁解决并发问题

时间:2023-03-30 01:20:48 PHP

在系统中,当有多个进程和线程可以更改一个共享数据时,并发问题很容易导致共享数据不一致。即多个进程同时获取对数据的操作权限并更新数据。一个典型的场景是,在线销售系统在销售热销产品时,遇到多个并发请求同时提交订单的情况是很有可能的。造成商品超卖现象。只要访问流量好,系统就可能会遇到并发请求导致数据库中的数据被重复写入的情况。解决多个进程并发执行程序块问题的方法是保证同一时刻只有一个进程可以执行同一个程序块。这个类比把并发执行变成串行顺序执行。为了防止获得执行权的进程被其他进程打扰,需要设置一个所有进程都可以读取的标志。当标志不存在时,可以设置标志,其他后续进程如果发现已经有标志就会等待有标志的进程结束程序块的执行取消标志,然后尝试设置标记。这个标记可以理解为一把锁,设置标记的过程就是我们通常所说的加锁。使用redis的setnx和expire方法做分布式锁setnx()setnx的意思是SETifNotExists,主要有两个参数setnx(key,value)。这个方法是原子的。如果key不存在,则当前key设置成功,返回1;如果当前键已经存在,则当前键设置失败并返回0。expire()expire设置过期时间。需要注意的是,setnx命令不能设置key的超时时间,只能通过expire()为key设置。具体步骤1、如果setnx(lockKey,1)返回0,说明占用失败;如果返回1,则表示入住成功。2.expire()命令设置lockKey的超时时间,避免死锁。3、执行完业务代码后,可以通过delete命令删除key。这个方案其实可以满足日常工作的需要,但是从技术方案来看,可能还有一些可以改进的地方。比如如果在第一步setnx执行成功后,expire()命令执行成功前发生宕机,那么还是会出现死锁问题,所以如果想改进的话,可以使用redis的setnx()、get()和getset()方法实现分布式锁。使用redis的setnx()、get()、getset()方法作为分布式锁的后台主要是基于setnx()和expire()解决可能出现的死锁问题,并做了一些优化。getset()命令主要有两个参数getset(key,newValue)。此方法是原子的,将newValue设置为key并返回key的原始旧值。假设key不存在,多次执行这条命令,会出现如下效果:getset(key,"value1")返回null,key的值会被设置为value1getset(key,"value2")返回value1,此时key的值会被设置成value2等等!使用stepsetnx(lockKey,当前时间+过期超时时间),若返回1,则获取锁成功;如果返回0,则未获取到锁,转步骤2。get(lockKey)获取值,该值是oldExpireTime所代表的当前lockKey的过期时间,将这个oldExpireTime与当前系统时间进行比较,如果是早于当前系统时间,则认为锁超时,可以允许其他请求重试获取,转步骤3,否则等待指定时间,返回步骤2重新开始判断。计算newExpireTime=当前时间+过期时间,则getset(lockKey,newExpireTime)会返回当前lockKey之前设置的旧值currentExpireTime。判断currentExpireTime是否等于oldExpireTime。如果相等,说明当前进程getset已经成功设置锁并获取到锁。如果不相等,说明锁已经被另外一个进程获取到,那么当前请求可以根据具体的需求逻辑直接返回failure,或者返回步骤2继续重试。获取到锁后,当前进程就可以开始自己的业务处理了。当处理完成后,将当前处理时间与为锁设置的超时时间进行比较。如果小于为锁设置的超时时间,直接执行delete释放锁;如果大于为锁设置的超时时间如果设置了超时时间,则锁可能已经被其他进程获取了。这时候执行delete释放锁会导致其他进程获取的锁被释放。下面是用PHP代码实现的Redis分布式锁。伪代码用于Redis部分。请根据自己的情况将伪代码替换为Redis连接对象。/***获取Redis分布式锁**@param$lockKey*@returnbool*/functiongetRedisDistributedLock(string$lockKey):bool{$lockTimeout=2000;//锁超时时间为2000毫秒$now=intval(microtime(真)*1000);$lockExpireTime=$now+$lockTimeout;$lockResult=Redis::setnx($lockKey,$lockExpireTime);if($lockResult){//当前进程设置锁成功returntrue;}else{$oldLockExpireTime=Redis::get($lockKey);如果($now>$oldLockExpireTime&&$oldLockExpireTime==Redis::getset($lockKey,$lockExpireTime)){returntrue;}}returnfalse;}/***串行执行程序**@paramstring$lockKey锁的key*@paramClosure$closure进程获得锁后要执行的闭包*@returnmixed*/functionserialProcessing(string$lockKey,闭包$closure){if(getRedisDistributedLock($lockKey)){$result=$closure();$now=intval(microtime(true)*1000);如果($now