前言分析前,请务必了解消息队列的实现。不懂的请先阅读:有赞消息队列设计去哪儿网消息队列设计tp5的消息队列是基于数据库redis和tp官方自己的本章Topthink是围绕redis实现的,用于分析和存储key:key类型descriptionqueues:queueNamelist待执行的任务think:queue:restartstringrestartqueuetimestampqueues:queueName:delayedz设置延迟任务queues:queueName:reservedz设置执行失败,等待重新执行执行命令work和listen的区别会在后面解释以下命令说明phpthinkqueue:worklisteningqueuephpthinkqueue:listenlisteningqueuephpthinkqueue:restart重启队列phpthinkqueue:subscribe尚未可用,可能官方保留了whatelseideabutyetyetimplementedBehaviorlabel标签描述worker_daemon_startdaemonprocessenableworker_memory_exceededmemoryexceedsworker_queue_restartrestartdaemonprocessworker_before_process任务开始执行worker_before_sleep任务延迟执行queue_failed任务执行失败命令参数默认值可用模式说明queuenullwork,监听任务执行名字daemonnullwork使用daemon进程执行任务delay0work,监听失败后重新执行的时间,forcenullwork失败后重新执行的时间memory128Mwork,listen限制最大内存sleep3work,listen没有任务时的等待时间tries0work,listen任务失败后的最大尝试次数模式区别1:不同的执行原理工作:单进程处理模式;nodaemonparameterwork进程在处理完下一条消息后直接结束当前进程。当没有新消息时,它会休眠一会然后退出;带有daemon参数的work进程会循环处理队列中的消息,直到内存超过参数配置的时候进程才会结束。当没有新消息时,它会在每个周期休眠一段时间;listen:父进程+子进程的处理方式;在其所在的父进程中会创建一个单执行模式的工作子进程,并通过工作子进程处理队列中的下一条消息。当工作子进程退出时,父进程会监听子进程的退出信号,重新创建一个新的单次执行工作子进程;2:exitwork的时机不同:看上面listen:父进程会继续正常运行,除非遇到下面两种情况01:一个创建的work子进程的执行时间超过了--timeout参数配置中的监听命令行;此时work子进程会被强制终止,listen所在的父进程也会抛出ProcessTimeoutException退出;开发者可以选择捕获异常,让父进程继续执行;02:父进程由于某种原因存在内存泄漏,当父进程自身占用的内存超过命令行中--memory参数配置时,父子进程都会退出。正常情况下,listen进程本身占用的内存是稳定的。3:性能不同。work:在脚本内部循环,在命令执行开始时已经加载了框架脚本;listen:处理完一个task后会启动一个新的work进程,此时会重新加载frame脚本;所以work模式的性能会比listen模式高。注意:当代码更新时,需要在work模式下手动执行phpthinkqueue:restart命令重启队列才能使修改生效;在监听模式下,无需其他操作,自动生效。4:超时控制能力工作:本质上既不能控制进程本身的运行时间,也不能限制执行中任务的执行时间;listen:可以限制其创建的工作子进程的超时时间;工作子进程可以通过超时参数限制进程允许运行的最长时间。超过这个时间限制还没有结束的子流程会被强制终止;expire和timeexpire的区别是在配置文件中设置的,指的是任务的过期时间。这个时间是全局的,影响所有的工作进程超时,在命令行参数中设置,指的是工作子进程的超时时间。该时间只对当前执行的监听命令有效。timeout的对象是work子进程;5:使用场景不同work的适用场景有:01:任务数量多02:性能要求高03:任务执行时间短04:consumer类没有死循环,sleep(),exit(),die()和其他容易引起bug的逻辑监听适用场景有:01:任务数量少02:任务执行时间长03:需要严格限制任务执行时间公共操作自我们基于redis来分析,我们只需要分析src/queue/connector/redis.php01:首先在src/Queue.php中调用魔术方法__callStatic02:在__callStatic方法中调用buildConnector03:首先加载buildConnector中的配置文件,如果没有,会同步执行04:根据配置文件创建连接,在redis.php中传入配置类的构造方法中的操作:01:检测是否安装了redis扩展02:合并configuration03:检测是redisextension还是pRedis04:创建连接对象发布流程发布参数参数名称默认值说明可用方法$jobNone执行Task类推送,后面$data空任务数据推送,后面$queuedefault任务namepush,later$delaynulldelaytimelaterexecuteimmediatelypush($job,$data,$queue)Queue::push(Test::class,['id'=>1],'test');show操作后返回一个数组,序列化后rPush到redis,关键是queue:queueName数组结构结构:['job'=>$job,//执行任务的类'data'=>$data,//任务数据'id'=>'xxxxx'//任务id]写入redis并返回queueid至于中间的操作,实在是太长了,所以没写后面的延迟释放($delay,$job,$data,$queue)Queue::later(100,Test::class,['id'=>1],'测试');和上面的操作差不多,返回一个数组,序列化后zAdd到redis。关键是queue:queueName:delayedscore是当前时间戳+$delay执行过程执行过程有work模式和listen两种模式的区别上面已经说了,因为代码逻辑太多,下一章再说坏了;最后说说标签的使用//Daemon进程启动'worker_daemon_start'=>[\app\index\behavior\WorkerDaemonStart::class],//内存超出'worker_memory_exceeded'=>[\app\index\behavior\WorkerMemoryExceeded::class],//重启守护进程'worker_queue_restart'=>[\app\index\behavior\WorkerQueueRestart::class],//任务开始执行Before'worker_before_process'=>[\app\index\behavior\WorkerBeforeProcess::class],//任务延迟执行'worker_before_sleep'=>[\app\index\behavior\WorkerBeforeSleep::class],//任务执行失败'queue_failed'=>[\app\index\behavior\QueueFailed::class]publicfunctionrun(Output$output){$output->write('
