swoole多进程数据处理在业务变化的发展中,不可避免的需要做数据处理。当数据超过100,000+时,耗时会变长。这时候就需要进行多任务处理。.Swoole文档:https://wiki.swoole.com/#/mem...https://wiki.swoole.com/#/mem...环境:php7.4swoole4.5mysql5.7laravel框架或基于PDO需要要修改options,需要修改config配置[//\PDO::ATTR_PERSISTENT=>true,//关闭持久连接\PDO::ATTR_EMULATE_PREPARES=>true//启用预仿真预处理语句];laravel框架不能使用协程,所以只能使用multiprocessing来处理数据。limit=500;$this->workerNum=24;//worker进程数$questionSql=$this->questionStl();$questionCount=$questionSql->count();$cycleIndex=ceil($questionCount/$this->limit);$this->worker($cycleIndex);$endtime=explode('',microtime());$thistime=$endtime[0]+$endtime[1]-($starttime[0]+$starttime[1]);$thistime=round($thistime,3);echo"执行时间:".$这次。“秒。".PHP_EOL;}privatefunctionworker($cycleIndex){$table=newTable(100);$table->column('index',Table::TYPE_INT,8);//数据页数$table->column('task_sum',Table::TYPE_INT,4);$table->column('worker_status',Table::TYPE_INT,4);$table->create();$table->set("队列",["index"=>0,"task_sum"=>$this->workerNum]);if($cycleIndex<$this->workerNum){$this->workerNum=$cycleIndex;}$pool=newPool($this->workerNum);$pool->on("WorkerStart",function(Pool$pool,$workerId)use($table,$cycleIndex){//页数原子递增$table->incr("queue","index",$incrby=1);$queue=$table->get("queue");if($queue['index']>$cycleIndex){sleep(100);//超过了数据处理页数,让进程直接休眠。返回;}//取出worker进程atom并自减$table->decr("queue","task_sum",$incrby=1);$table->set($workerId,['worker_status'=>1]);\数据库::断开连接();//laravel框架断开DB链接,其他框架查看文档框架。基于swoole框架,echo"Worker#{$workerId}isstarted"。PHP_EOL;回声“总计”。$周期索引。“页面,开始:”。$队列['索引']。//业务数据逻辑处理开始$errorQuestion=[];$questionSql=$this->questionStl();$questionData=$questionSql->select(['question.question_id','question.question_no','question_analysis','question_content'])->forPage($queue['index'],$this->limit)->groupBy('question_no')->get();如果($questionData->isNotEmpty()){$errorQuestion=$this->existsImage($questionData,$errorQuestion);}if(count($errorQuestion)){//这是laravel5.5的日志处理,其他版本框架请参考相应文档。$monolog=\Log::getMonolog();$monolog->popHandler();//将错误标题写入日志\Log::useFiles(storage_path('logs/error_invalid_path.log'));\Log::info(“需要手动修复:”);\Log::info(array_values($errorQuestion));$monolog=\Log::getMonolog();$monolog->popHandler();\Log::useFiles(storage_path('logs/error_path.log'));}//业务数据逻辑处理结束});$pool->on("WorkerStop",function(Pool$pool,$workerId)使用($table,$cycleIndex){$worker_status=$table->get($workerId,"worker_status");if($worker_status==1){//放回worker进程原子自增$table->incr("queue","task_sum",$incrby=1);$table->set($workerId,['worker_status'=>2]);}$queue=$table->get("queue");if($queue['index']>=$cycleIndex){//等待最后一个进程完成处理if($queue['task_sum']>=$this->workerNum){$pool->shutdown();//关闭进程池,所有子进程结束运行}返回;}echo"Worker#{$workerId}已停止"。PHP_EOL;});$池->开始();}/***数据查询*@returnmixed*/privatefunctionquestionStl(){//题目查询returnQuestion::query()->where('question_status',1)->when($this->subject_id,函数($query){$query->where('subject_id',$this->subject_id);})->when($this->updated_at,function($query){$query->where('updated_at','>=',$this->updated_at);});}/***@param$questionData*@param$errorQuestion*@returnmixed*/privatefunctionexistsImage($questionData,$errorQuestion){//处理问题目标图片数据return$errorQuestion;}}
