当前位置: 首页 > 后端技术 > PHP

Laravel基于redis队列分析

时间:2023-03-29 22:17:50 PHP

Last-Modified:2019-05-1015:04:22参考链接UseLaravelQueue你要了解的LaravelQueueDocumentationRedis中文文档本文环境Laravel5.5QueueRedisWhyUseQueues使用队列的目的一般是:异步执行错误重试解释:异步执行:有些代码执行很耗时。为了提高响应速度,避免占用过多的连接资源,可以将这部分代码放入队列中异步执行。例如。网站新用户注册后,需要发送欢迎信息涉及网络IO,无法控制时间消耗的Email,非常适合在队列中执行。错误重试:为了保证某些任务的正常执行,可以将任务放入队列中执行。如果执行失败,可以延迟一段时间后重试,直到任务处理成功或者错误次数超过N次后取消执行。例如。用户需要绑定手机号码。这个时候发送短信的接口就要依赖第三方了。一个是不确定和耗时的,另一个是不确定调用是否成功。为了保证调用成功,Laravel中需要在队列出错后重试。下面对默认队列及其配置的分析如下默认队列引擎:redis使用redis-cli中的monitor命令查看具体执行命令语句默认队列名称:default分发任务这里我们以分发异步通知(类XxxNotification实现ShouldQueue)为例。当在Laravel中发起一个异步通知时,Laravel会在redis中的任务队列中添加一个新的任务。Redis执行语句redis>RPUSHqueues:default{"displayName":"App\\Listeners\\RebateEventListener","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"timeout":null,"timeoutAt":null,"data":{"commandName":"Illuminate\\Events\\CallQueuedListener","command":"O:36:\"Illuminate\\Events\\CallQueuedListener\":7:{s:5:\"class\";s:33:\"App\\Listeners\\RebateEventListener\";s:6:\"method\";s:15:\"onRebateCreated\";s:4:\"data\";a:1:{i:0;O:29:\"App\\Events\\RebateCreatedEvent\":4:{s:11:\"\u0000*\u0000tbkOrder\";O:45:\"Illuminate\\Contracts\\Database\\ModelIdentifier\":3:{s:5:\"class\";s:19:\"App\\Models\\TbkOrder\";s:2:\"id\";i:416;s:10:\"connection\";s:5:\"mysql\";}s:15:\"\u0000*\u0000notifyAdmins\";b:1;s:13:\"\u0000*\u0000manualBind\";b:0;s:6:\"socket\";N;}}s:5:\"tries\";N;s:9:\"timeoutAt\";N;s:7:\"超时\";N;s:6:\"\u0000*\u0000job\";N;}"},"id":"iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB","attempts":0}上面的redis语句是将任务信息(json格式)rpush到redis队列queues:default的末尾。任务队列WorkerLaravel处理任务队列进程开启方式:phpartisanqueue:work,为了更好的观察,这里使用--once选项指定队列中的单个任务进行处理。更具体的参数可以参考文档phpartisanqueue:work--once--delay=1--tries=3上面执行语句参数的含义:--once只执行任务一次,默认就是那个常驻进程已执行--tries=3任务出错最多重试3次,默认不限次数重试--delay=1任务出错后,每次延迟1秒后再次执行,默认是0秒的延迟。当Worker启动时,会依次执行以下步骤:这里仍然以默认队列default为例,只说明redis的相关操作从queues:default:delayed的有序集合中获取可以处理的“延迟任务”,并rpush到queue的尾部:defaultqueue具体执行语句:redis>eval"Luascript"2queues:default:delayedqueues:default当前时间戳Lua脚本内容如下:--获取所有过期\"score\"的作业...localval=redis.call('zrangebyscore',KEYS[1],'-inf',ARGV[1])--如果我们在数组中有值,我们将从第一个队列中移除它们--并将它们添加到目标队列以100个为一组,非常安全地将所有适当的作业移动到目标队列中。如果(下一个(val)?=nil)然后redis。call('zremrangebyrank',KEYS[1],0,#val-1)fori=1,#val,100做redis.call('rpush',KEYS[2],unpack(val,i,math.min(i+99,#val)))endendreturnval从queue:default:reserved有序集合中获取过期的“保留任务”,并rpush到queue:defaultqueue的尾部具体执行语句:redis>eval“Lua脚本”2queues:default:reservedqueues:default当前时间戳使用的Lua脚本同步骤1从queue:default从队列中获取(lpop)一个任务,增加尝试次数,将任务保存到qu在eu:default:reserved有序集合中,任务的score值为当前时间+90(任务执行超时时间)具体执行语句:redis>eval"Luascript"2queues:defaultqueues:default:reservedtasktimeouttimePokeLua脚本--从队列中弹出第一个作业...localjob=redis.call('lpop',KEYS[1])localreserved=falseif(job~=false)then--增加尝试次数并将作业放在保留队列上...reserved=cjson.decode(job)reserved['attempts']=reserved['attempts']+1reserved=cjson.encode(reserved)redis.call('zadd',KEYS[2],ARGV[1],reserved)endreturn{job,reserved}这里的90是根据配置:config('queue.connections.redis.retry_after')如果预计任务耗时过长,该值应该增加,以防止任务在仍在执行时被重置。成功执行上面获取的任务后,将任务从queues:default:reserved队列中移除。具体执行语句:ZREMqueues:default:reserved"特定任务"if任务执行失败,有两种情况:任务失败次数没有达到指定的重试阈值,从queues:default:reserved中移除任务,并将任务添加到queue:default:delayed的有序集合中,其中score为任务下次执行的时间戳。执行语句:redis>EVAL"Luascript"2queues:default:delayedqueues:default:reserved"Failedtask"任务延迟执行Lua脚本的时间戳--Removethejobfromthecurrentqueue...redis.call('zrem',KEYS[2],ARGV[1])--添加作业到\"delayed\"队列中...redis.call('zadd',KEYS[1],ARGV[2],ARGV[1])returntrue如果任务失败次数超过指定的重试阈值,则从队列中移除任务:default:reserved执行语句:redis>ZREMqueue:default:reserved注意,上面使用Lua脚本的目的是操作的原子性。Redis是单进程单线程模式。以Lua脚本形式执行命令时,可以保证脚本执行的原子性,不存在并发问题。关于Redis的原子操作上面Laravel使用redis作为队列存储引擎时,操作redis时使用exec执行Lua脚本,保证原子性。这里为不熟悉redis的同学简单介绍一下。以上面Worker启动时的第1步为例:从queues:default:delayed的有序集合中获取可以处理的“延迟任务”,并rpush到queue:defaultqueue的尾部具体执行语句:redis>eval"Luascript"2queues:default:delayedqueues:defaultcurrenttimestampLuascript内容如下:--获取所有过期\"score\"的作业...localval=redis.call('zrangebyscore',KEYS[1],'-inf',ARGV[1])--如果我们在数组中有值,我们将从第一个队列中移除它们--并将它们添加到目的地以100个为一组进行排队,这会非常安全地将所有适当的作业移动到目标队列中。如果(下一个t(val)~=nil)然后redis.call('zremrangebyrank',KEYS[1],0,#val-1)fori=1,#val,100doredis.call('rpush',KEYS[2],unpack(val,i,math.min(i+99,#val)))endendreturnval上述步骤先从queues:default:delayed的有序集合中获取可以处理的“延迟任务”,并rpush它们toqueue:default队列的尾部。所以如果不使用Lua脚本,一般的做法是:$jobs=$redis->zRangeByScore("queues:default:delayed","-inf",time())if(!empty($作业)){$redis->zRem("queues:default:delayed",...$jobs);$redis->rPush("queues:default",...$jobs);如果是单个Worker,上面的脚本是没有问题的,但是如果是多个Worker呢?在php层面执行以上操作会导致并发问题。Worker_1和Worker_2从queues:default:delayed队列中获取多个任务后,执行rPush语句,会导致任务执行两次,如果有多个Worker,则执行更多次。只要是可能导致并发问题的情况,就一定会发生。以分布式锁为例。锁的两个基本操作:LockUnlockLock操作//生成一个唯一的锁id$identifier=uniqid(php_uname("n")."_",true);//只有当key不存在时才设置,过期时间为5秒$result=$redis->set("lock_key",$identifier,["NX","EX"=>5]);解锁操作$script=<<evaluate($script,["lock_key",$标识符],1);至于为什么Unlock操作这么麻烦,可以看看下面两个有问题的解决办法,再想想。有问题的解决方案是$redis->del("lock_key");有问题解决方案2if($redis->get("lock_key")==$identifier){$redis->del("lock_key");}