当前位置: 首页 > 后端技术 > PHP

基于Swoole和Redis的并发队列处理系统

时间:2023-03-29 15:05:01 PHP

背景由于PHP不支持多线程,作为一个完整的系统,很多操作需要异步完成。为了完成这些异步操作,我们做了一个基于Redis队列的任务系统。大家知道,一个消息队列处理系统主要分为消费者和生产者两部分。在我们的系统中,主系统充当生产者,任务系统充当消费者。具体工作流程如下:1.主系统将待处理的任务名称+任务参数推入队列。2、任务系统实时弹出任务队列。当弹出一个任务时,会fork出一个子流程,由子流程完成具体的任务逻辑。具体代码如下:/***启动守护进程*/publicfunctionrunAction(){Tools::log_message('ERROR','daemon/run'.'|action:restart','daemon-');while(true){$this->fork_process();}exit;}/***创建子进程*/privatefunctionfork_process(){$ppid=getmypid();$pid=pcntl_fork();if($pid==0){//子进程$pid=posix_getpid();//echo"*进程{$pid}已创建\n\n";$this->mq_process();出口;}else{//主进程$pid=pcntl_wait($status,WUNTRACED);//获取子进程的结束状态if(pcntl_wifexited($status)){//echo"\n\n*子进程:{$pid}exitedwith{$status}";//Tools::log_message('INFO','daemon/runsucc'.'|status:'.$status.'|pid:'.$ppid.'|childpid:'.$pid);}else{Tools::log_message('ERROR','daemon/runfail'.'|status:'.$status.'|pid:'.$ppid.'|childpid:'.$pid,'daemon-');}}}/***业务任务队列处理*/私有函数mq_process(){$data_pop=$this->masterRedis->rPop($this->redis_list_key);$data=json_decode($data_pop,1);如果(!$数据){返回假;}$worker='_task_'.$data['worker'];$class_name=isset($data['class'])?$data['class']:'TaskproModel';$params=$data['params'];$class=new$class_name();$class->$worker($params);returnTRUE;}这是一个简单的任务处理系统,通过这个任务系统帮助我们实现了异步,目前已经稳定运行了将近一年。但不幸的是,它是一个单进程系统。它不断地fork,有任务就处理,没有任务就跳过。这是非常稳定的。但是有两个问题:一是不断的fork和pop会浪费服务器资源,二是不支持并发!第一个问题还好,第二个问题就严重了。当主系统同时抛出大量任务时,任务的处理时间会无限延长。新设计为了解决并发问题,我们计划做一个更高效、更强大的团队处理系统。因为PHP7之前不支持多线程,所以我们使用多进程。我从网上找了很多资料。大多数所谓的多进程,就是N个进程同时在后台运行。显然这是不合适的。我的期望是:每弹出一个task,就会fork一个task,执行完task后子流程结束。遇到的问题1.如何控制最大进程数这个问题很简单,就是每次fork一个子进程,都会自动递增。并且子进程执行的时候,会减1次。自增是没有问题的,我们在主进程中操作一下就完事了。那么如何减少呢?也许你会说,当然是在子进程中了。但是这里需要注意:fork的时候,一个资源是从主进程copy到子进程的,也就是说不能在子进程中操作主进程中的计数器!因此,这里有一个知识点需要了解:signal。具体可以自行google,这里直接看代码。//为死孩子安装信号处理器spcntl_signal(SIGCHLD,array($this,"sig_handler"));这将安装一个信号处理程序。当然,还缺少一件事。声明(刻度=1);declare是控制结构语句,具体用法请自行谷歌。这段代码的意思是每次执行低级语句时调用信号处理器。这样子进程每次结束都会调用signalhandler,我们可以在signalhandler中进行自减。2、多进程开发中进程残留如何解决,处理不当会导致进程残留。为了解决进程残留,必须回收子进程。那么如何回收子进程是一个技术点。在pcntl的demo中,包括很多博文,都说在主进程中回收子进程。但是我们是基于Redis的brpop,brpop是阻塞的。这就导致了一个问题:执行完N个任务后,任务系统空闲时主进程阻塞,发生阻塞时子进程还在执行,导致最后几个子进程的进程恢复无法完成。..这里一直很挣扎,但是一旦我把信号处理器拿开,就很容易了。进程回收也放在信号处理器中。新系统评价pcntl是一个进程处理扩展,但不幸的是它对多进程的支持非常薄弱。所以这里使用了Swoole扩展中的Process。具体代码如下:declare(ticks=1);类JobDaemonController扩展Yaf_Controller_Abstract{使用Trait_Redis;私人$maxProcesses=800;私人$孩子;私人$masterRedis;私人$redis_task_wing='任务:翼';//挂起队列publicfunctioninit(){//为死孩子安装信号处理器pcntl_signal(SIGCHLD,array($this,"sig_handler"));设置时间限制(0);ini_set('default_socket_timeout',-1);//队列处理不超时,解决Redis报错:readerroronconnection}privatefunctionredis_client(){$rds=newRedis();$rds->connect('redis.master.host',6379);返回$rds;}publicfunctionprocess(swoole_process$worker){//第一次处理$GLOBALS['worker']=$worker;swoole_event_add($worker->pipe,function($pipe){$worker=$GLOBALS['worker'];$recv=$worker->read();//发送数据给mastersleep(rand(1,3));echo"来自大师:$recv\n";$worker->exit(0);});exit;}publicfunctiontestAction(){for($i=0;$i<10000;$i++){$data=['abc'=>$i,'timestamp'=>time().rand(100,999)];$this->masterRedis->lpush($this->redis_task_wing,json_encode($data));}exit;}publicfunctionrunAction(){while(1){//echo"\t现在我们有了$this->child子进程\n";if($this->child<$this->maxProcesses){$rds=$this->redis_client();$data_pop=$rds->brpop($this->redis_task_wing,3);//无任务时,阻塞等候if(!$data_pop){continue;}echo"\tStartingnewchild|现在我们有了$this->child子进程\n";$this->child++;$process=newswoole_process([$t他的,“过程”]);$process->write(json_encode($data_pop));$pid=$process->start();}}}privatefunctionsig_handler($signo){//echo"Recive:$signo\r\n";switch($signo){caseSIGCHLD:while($ret=swoole_process::wait(false)){//echo"PID={$ret['pid']}\n";$this->child--;}}}}最后经过测试,单核1G服务器可以做到800个并发任务1到3秒