当前位置: 首页 > 后端技术 > PHP

think-queue分析对

时间:2023-03-29 23:22:42 PHP

前言分析前,请务必了解消息队列的实现。不懂的请先阅读:有赞消息队列设计去哪儿网消息队列设计tp5的消息队列是基于数据库redis和tp官方自己的本章Topthink是围绕redis实现的,用于分析和存储key:key类型descriptionqueues:queueNamelist待执行的任务think:queue:restartstringrestartqueuetimestampqueues:queueName:delayedz设置延迟任务queues:queueName:reservedz设置执行失败,等待重新执行执行命令work和listen的区别会在后面解释以下命令说明phpthinkqueue:worklisteningqueuephpthinkqueue:listenlisteningqueuephpthinkqueue:restart重启队列phpthinkqueue:subscribe尚未可用,可能官方保留了whatelseideabutyetyetimplementedBehaviorlabel标签描述worker_daemon_startdaemonprocessenableworker_memory_exceededmemoryexceedsworker_queue_restartrestartdaemonprocessworker_before_process任务开始执行worker_before_sleep任务延迟执行queue_failed任务执行失败命令参数默认值可用模式说明queuenullwork,监听任务执行名字daemonnullwork使用daemon进程执行任务delay0work,监听失败后重新执行的时间,forcenullwork失败后重新执行的时间memory128Mwork,listen限制最大内存sleep3work,listen没有任务时的等待时间tries0work,listen任务失败后的最大尝试次数模式区别1:不同的执行原理工作:单进程处理模式;nodaemonparameterwork进程在处理完下一条消息后直接结束当前进程。当没有新消息时,它会休眠一会然后退出;带有daemon参数的work进程会循环处理队列中的消息,直到内存超过参数配置的时候进程才会结束。当没有新消息时,它会在每个周期休眠一段时间;listen:父进程+子进程的处理方式;在其所在的父进程中会创建一个单执行模式的工作子进程,并通过工作子进程处理队列中的下一条消息。当工作子进程退出时,父进程会监听子进程的退出信号,重新创建一个新的单次执行工作子进程;2:exitwork的时机不同:看上面listen:父进程会继续正常运行,除非遇到下面两种情况01:一个创建的work子进程的执行时间超过了--timeout参数配置中的监听命令行;此时work子进程会被强制终止,listen所在的父进程也会抛出ProcessTimeoutException退出;开发者可以选择捕获异常,让父进程继续执行;02:父进程由于某种原因存在内存泄漏,当父进程自身占用的内存超过命令行中--memory参数配置时,父子进程都会退出。正常情况下,listen进程本身占用的内存是稳定的。3:性能不同。work:在脚本内部循环,在命令执行开始时已经加载了框架脚本;listen:处理完一个task后会启动一个新的work进程,此时会重新加载frame脚本;所以work模式的性能会比listen模式高。注意:当代码更新时,需要在work模式下手动执行phpthinkqueue:restart命令重启队列才能使修改生效;在监听模式下,无需其他操作,自动生效。4:超时控制能力工作:本质上既不能控制进程本身的运行时间,也不能限制执行中任务的执行时间;listen:可以限制其创建的工作子进程的超时时间;工作子进程可以通过超时参数限制进程允许运行的最长时间。超过这个时间限制还没有结束的子流程会被强制终止;expire和timeexpire的区别是在配置文件中设置的,指的是任务的过期时间。这个时间是全局的,影响所有的工作进程超时,在命令行参数中设置,指的是工作子进程的超时时间。该时间只对当前执行的监听命令有效。timeout的对象是work子进程;5:使用场景不同work的适用场景有:01:任务数量多02:性能要求高03:任务执行时间短04:consumer类没有死循环,sleep(),exit(),die()和其他容易引起bug的逻辑监听适用场景有:01:任务数量少02:任务执行时间长03:需要严格限制任务执行时间公共操作自我们基于redis来分析,我们只需要分析src/queue/connector/redis.php01:首先在src/Queue.php中调用魔术方法__callStatic02:在__callStatic方法中调用buildConnector03:首先加载buildConnector中的配置文件,如果没有,会同步执行04:根据配置文件创建连接,在redis.php中传入配置类的构造方法中的操作:01:检测是否安装了redis扩展02:合并configuration03:检测是redisextension还是pRedis04:创建连接对象发布流程发布参数参数名称默认值说明可用方法$jobNone执行Task类推送,后面$data空任务数据推送,后面$queuedefault任务namepush,later$delaynulldelaytimelaterexecuteimmediatelypush($job,$data,$queue)Queue::push(Test::class,['id'=>1],'test');show操作后返回一个数组,序列化后rPush到redis,关键是queue:queueName数组结构结构:['job'=>$job,//执行任务的类'data'=>$data,//任务数据'id'=>'xxxxx'//任务id]写入redis并返回queueid至于中间的操作,实在是太长了,所以没写后面的延迟释放($delay,$job,$data,$queue)Queue::later(100,Test::class,['id'=>1],'测试');和上面的操作差不多,返回一个数组,序列化后zAdd到redis。关键是queue:queueName:delayedscore是当前时间戳+$delay执行过程执行过程有work模式和listen两种模式的区别上面已经说了,因为代码逻辑太多,下一章再说坏了;最后说说标签的使用//Daemon进程启动'worker_daemon_start'=>[\app\index\behavior\WorkerDaemonStart::class],//内存超出'worker_memory_exceeded'=>[\app\index\behavior\WorkerMemoryExceeded::class],//重启守护进程'worker_queue_restart'=>[\app\index\behavior\WorkerQueueRestart::class],//任务开始执行Before'worker_before_process'=>[\app\index\behavior\WorkerBeforeProcess::class],//任务延迟执行'worker_before_sleep'=>[\app\index\behavior\WorkerBeforeSleep::class],//任务执行失败'queue_failed'=>[\app\index\behavior\QueueFailed::class]publicfunctionrun(Output$output){$output->write('任务执行失败',true);}控制台执行phpthinkqueue:work--queuetest--daemon会在控制台输出一次守护进程,开启任务延迟执行失败的处理。如果有任务执行失败或者执行次数达到最大值,就会触发queue_failed。把失败逻辑写在app\index\behavior@run方法中,比如邮件通知写到日志等。最后说说在其他框架或项目中如何将消息队列推送到tp项目。比如两个项目是分开的,一个用的不是tp5的框架。在其他项目中推送任务php版本redis=newRedis();$this->redis->connect('127.0.0.1',6379);$this->redis->select(10);}publicfunctionpush($job,$data,$queue){$payload=$this->createPayload($job,$data);$this->redis->rPush('queues:'.$queue,$payload);}publicfunctionlater($delay,$job,$data,$queue){$payload=$this->createPayload($job,$data);$this->redis->zAdd('queues:'.$queue.':delayed',time()+$延迟,$有效载荷);}privatefunctioncreatePayload($job,$data){$payload=$this->setMeta(json_encode(['job'=>$job,'data'=>$data]),'id',$this->随机(32));返回$this->setMeta($payload,'attempts',1);}私有函数setMeta($payload,$key,$value){$payload=json_decode($payload,true);$payload[$key]=$value;$payload=json_encode($payload);if(JSON_ERROR_NONE!==json_last_error()){thrownewInvalidArgumentException('无法创建负载:'.json_last_error_msg());}返回$有效载荷;}privatefunctionrandom(int$length=16):string{$str='0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';$randomString='';对于($i=0;$i<$length;$i++){$randomString.=$str[rand(0,strlen($str)-1)];}返回$randomString;}}(新索引())->later(10,'app\index\jobs\Test',['id'=>1],'test');go版本包mainimport("encoding/json""github.com/garyburd/redigo/redis""math/rand""time")typePayloadstruct{Idstring`json:"id"`作业字符串`json:"job"`数据接口{}`json:"data"`尝试int`json:"attempts"`}varRedisClient*redis.Poolfuncinit(){RedisClient=&redis.Pool{MaxIdle:20,MaxActive:500,IdleTimeout:time.Second*100,Dial:func()(connredis.Conn,eerror){c,err:=redis.Dial("tcp","127.0.0.1:6379")iferr!=nil{returnnil,err}_,_=c.Do("SELECT",10)返回c,nil},}}funcmain(){vardata=make(map[string]interface{})data["id"]="1"later(10,"app\\index\\jobs\\Test",data,"test")}funcpush(jobstring,datainterface{},queuestring){payload:=createPayload(job,data)queueName:="queues:"+queue_,_=RedisClient.Get().Do("rPush",queueName,payload)}funclater(delayint,jobstring,数据接口{},队列字符串){m,_:=time.ParseDuration("+1s")currentTime:=time.Now()op:=currentTime.Add(time.Duration(time.Duration(delay)*m)).Unix()createPayload(job,data)payload:=createPayload(job,data)queueName:="queues:"+queue+":delayed"_,_=RedisClient.Get().Do("zAdd",queueName,op,payload)}//创建指定格式的数据函数createPayload(jobstring,datainterface{})(payloadstring){payload1:=&Payload{Job:job,Data:data,Id:random(32),Attempts:1}jsonStr,_:=json.Marshal(payload1)returnstring(jsonStr)}//创建随机字符串funcrandom(nint)string{varstr=[]rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")b:=make([]rune([]rune,n)fori:=rangeb{b[i]=str[rand.Intn(len(str))]}returnstring(b)}