当前位置: 首页 > 后端技术 > PHP

PHP的swoole系列异步任务(task)教程(超级详细)

时间:2023-03-29 16:06:09 PHP

swoole异步任务教程先学习swoole的整体流程。1、swoole异步可以解决什么样的问题?2、如何使用swoole异步任务?3、使用时需要注意哪些细节?1、swoole异步可以解决什么样的问题?相信大家在当前或者以后的工作中都会遇到以下一些问题。问题1,测试同事说,哇,这个界面好慢,平均响应时间超过2秒。你能优化它吗?然后自己一个,原来有发送邮件或者短信验证码的操作。仔细排查,原来是第三方接口响应慢,导致自身接口响应慢。这时候swoole异步任务就很适合解决这类问题。您可以将此操作发布到TaskWorker进程池中执行。post后,程序会立即返回,程序会继续往下执行代码,不会拖慢当前进程的处理速度。这种情况只是一个例子。如果业务中有类似的操作,可以丢到异步任务中去执行。2、如何使用swoole异步任务?既然您知道可以解决什么问题,那么如何使用它呢?首先,让我们明确一下使用的一般逻辑。逻辑:我们需要创建一个swoole服务,然后通过客户端向服务端发送任务,最后服务端收到数据后,执行相应的操作,结束。根据我自己的经验,我总结了以下3个步骤。PS:首先自己安装swoole扩展1.创建swoole服务2.将任务post到任务进程池3.自己编写任务中要执行的操作2.1创建swoole服务,其实就是初始化swoole\server对象因为task任务是swoole\server对象的一个??方法,所以我们需要先新建一个swoole\server对象,设置对象的一些配置,注册一些方法,然后我们就可以调用了。初始化时必须设置的点task_worker_num任务进程数注册onTask和onFinish这两个方法以下是我封装的初始化swoole\server对象代码,供参考。自己添加类对应的命名空间。类服务器{私人$serv;/***服务器构造函数。*/publicfunction__construct(){//新建一个swoole\server对象,可以像newSwoole\Server('127.0.0.1',9501)这样写,也可以new\swoole_server("127.0.0.1",9501))这样写,只是一个别名,其实是一样的。//其中127.0.0.1是指定监听的ip地址。IPv4使用127.0.0.1监听本机,0.0.0.0表示监听所有地址,一般可以监听本机//$port监听的端口,比如95010-1024,由系统保留默认,推荐位置从5000开始,一般使用默认的9501$this->serv=new\swoole_server("127.0.0.1",9501);//配置参数$this->serv->set(array('task_worker_num'=>200,//task进程数//下面是一些常用的配置参数说明//'worker_num'=>32,//worker进程数一般设置为服务器CPU数的1-4倍1//'daemonize'=>1,//111用守护进程执行11//'max_request'=>2000,//'dispatch_mode'=>3,//抢占模式,主进程会根据Worker的忙闲状态选择投递,只会投递给空闲的Worker//"task_ipc_mode"=>1,//使用UnixSocket通信,默认方式//"log_file"=>"log/taskqueueu.log",//log));//绑定事件对应的方法。//当客户端发送数据时,会触发Receive函数。该方法可以接收客户端发送的参数$this->serv->on('Receive',array($this,'onReceive'));//在task方法中,执行相应的操作$this->serv->on('Task',array($this,'onTask'));//当任务执行完成时,会触发finish事件。$this->serv->on('Finish',array($this,'onFinish'));//启动swoole服务$this->serv->start();}/***注:接收数据时触发的回调函数*User:Swoole*@param$servswoole\serverobject*@param$fd$connectedfiledescriptor*@param$from_idTCP连接所在的Reactor线程ID*@param$data接收到的数据内容,可以是文本或二进制内容*/publicfunctiononReceive($serv,$fd,$from_id,$data){//转发数据给task$serv->task($data);}/***注意事项:执行task任务*用户:闻铃*@param$servswoole\server对象*@param$task_idtask进程id*@param$from_idTCP连接所在的Reactor线程ID*@param$data接收到的数据内容,可以是文本或二进制内容*/publicfunctiononTask($serv,$task_id,$from_id,$data){//暂时留空部分2.3会完成}/***注:回调用于任务完成*User:Smellbell*@param$servswoole\serverobject*@param$task_idaskprocessid*@param$data*/publicfunctiononFinish($serv,$task_id,$data){//echo"塔斯k{$task_id}finish\n";//echo"Result:{$data}\n";}//调用这个runStart方法创建一个swoole服务,我这里是TP,在public目录下,执行phpindex.phpapi/server/runStart命令启动swoole,通过netstat-tunlp查看是否启动成功publicfunctionrunStart(){}}2.2通过客户端类将任务下发到任务进程池send方法即可传递给swoole服务类SwooleClient{private$client;publicfunction__construct(){$this->client=newSwoole\Client(SWOOLE_SOCK_TCP);}publicfunctionconnect(){//9501要用swoole服务监听端口号相同if(!$this->client->connect("127.0.0.1",9501,-1)){thrownew\Exception(sprintf('SwooleError:%s',$this->client->errCode));}}//发送一个数据到swoole服务publicfunctionsend($data){if($this->client->isConnected()){if(!is_string($data)){$data=json_encode($data);}//拼接"\r\n"就是解决tcps循环场景下的投递任务可能出现的tickypacket问题。返回$this->client->send($data."\r\n");}else{thrownew\Exception('SwooleServer没有连接。');}}publicfunctionclose(){$this->client->close();}}当需要下发时,新建一个客户端类,将数据下发到swoole服务$client=newSwoolecli();$client->connect();if($client->send($value)){//成功,关闭链接$client->close();}else{//异常处理}如果是swoole的协程框架。可以直接发货。因为常驻内存,所以内存中有初始化好的swoole对象。不需要通过客户端传递,直接$server->task($data)即可。比如在think-swoole中,可以直接通过如下方式下发:$get=$this->request->get();$code=mt_rand(1111,9999);$phoneNum=$get['phone_num'];$taskData=['method'=>'sendSms','data'=>['code'=>$code,'phone_num'=>$phoneNum]];//下发任务$this->app->swoole->task(json_encode($taskData));在easyswoole中可以直接传递$server=ServerManager::getInstance()->getSwooleServer如下方式()->task(json_encode($task_data));2.3在任务中编写自己要执行的业务操作//执行异步任务publicfunctiononTask($serv,$task_id,$from_id,$data){//一般发送的数据是json数据。$data=json_decode($data,true);//接收任务数据,处理业务逻辑处理。我一般喜欢创建一个新的类来执行异步任务并统一管理相应的操作。在这个任务中,你只需要写3行代码。$method=$data['方法'];返回SwooleTask::getInstance()->$method($data['data'],$serv);}classSwooleTask{privatestatic$obj=null;publicstaticfunctiongetInstance(){if(is_null(self::$obj)){self::$obj=newself();}返回自我::$对象;}/***Notes:发送短信验证码*User:Smellthebell*DateTime:2021/9/94:06PM*@paramarray$data*@returnbool*/publicfunctionsendSms($data=[]){如果(empty($data)){返回false;}returnMsgCode::send($data['phone_num'],$data['code']);}/***通过任务机制向所有客户端推送数据*@param$data*@param$servswooleserverobject*/publicfunctionpushLive($data,$serv){//获取redis中的有序集合。$clients=get_redis_obj()->set();foreach($clientsas$fd){$serv->push($fd,json_encode($data,JSON_UNESCAPED_UNICODE));}}}3、使用时需要注意哪些细节?1、swoole服务配置较多,需要根据业务场景和环境设置合适的配置。具体配置可以参考官方文档,所有配置如下:https://wiki.swoole.com/#/ser...下面是我总结的常用配置1.'worker_num'=>32,//一般设置为服务器CPU数的1-4倍。lscpu命令,cpus参数,即cpu核数。说明:指定要启动的工作进程数。解释:swoole是master->n*worker模式。开启的worker进程越多,服务器的负载能力就越大,但是相应的服务器也会占用更多的内存。同时,当worker进程过多时,进程间切换带来的系统开销也会更大。因此,建议启动的worker进程数为CPU核数的1-4倍。2.'ipc_mode'=>1说明:设置进程间的通信模式。说明:一共有三种通信方式,参数如下1=>使用unixsocket通信2=>使用消息队列通信3=>使用消息队列通信,并设置为竞争模式3.'max_request'=>2000,说明:每一个worker进程允许处理的最大任务数。注意:设置该值后,每个worker进程在处理完max_request请求后会自动重启。设置这个值的主要目的是防止worker进程处理大量请求可能导致的内存溢出。4.'max_conn'=>10000说明:服务器允许的最大TCP连接数说明:设置该参数后,当服务器上已有的连接数达到该值时,新的连接将被拒绝。另外这个参数的值不能超过操作系统的ulimit-n的值,这个值也不要设置的太大,因为swoole_server会一次性申请一大块内存来存放每个连接的信息。5.'dispatch_mode'=>3说明:指定包分发策略。说明:一共有三种模式,参数如下:1=>roundrobin模式,每个worker进程会轮流分配给每个worker进程2=>fixed模式,worker根据连接的文件描述符分配。这样可以保证同一个连接发送的数据只会被同一个worker处理3=>preemptivemode,主进程会根据worker的忙状态选择投递,只投递给空闲的worker6。'task_worker_num'=>8说明:服务器启动的任务进程数。注意:设置该参数后,服务端会开启异步任务功能。此时就可以使用task方法来下发异步任务了。7.'task_max_request'=>10000设置该参数后,必须为swoole_server设置onTask/onFinish两个回调函数,否则启动服务器会报错。说明:每个任务进程允许处理的最大任务数。说明:参考max_requesttask_worker_num示例:8.'task_max_request'=>10000说明:每个任务进程允许处理的最大任务数。说明:参考max_requesttask_worker_num9.'task_ipc_mode'=>2说明:设置task进程与worker进程的通信方式。说明:参考ipc_mode10.'daemonize'=>1,//设置程序进入后台为daemon,daemon为1或0。注意:长时间运行的服务端程序必须开启该选项。如果没有启用守护进程,当ssh终端退出时,程序将被终止。启用守护程序时,标准输入和输出将重定向到log_file,如果未设置log_file,则丢弃所有输出。11.'log_file'=>'/data/log/swoole.log'指定日志文件的路径注意:swoole运行过程中出现的异常信息都会记录在这个文件中。默认情况下,它会打印到屏幕上。注意log_file不会自动拆分文件,所以这个文件需要定期清理。12.'heartbeat_check_interval'=>60设置心跳检测间隔说明:该选项表示每隔多久进行一次循环,单位为秒。每次检测到都会遍历所有连接,如果某个连接在该时间间隔内没有数据发送,则强制关闭该连接(会有onClose回调)。13.'heartbeat_idle_time'=>600设置连接允许的最大空闲时间。注意:此参数与heartbeat_check_interval一起使用。每次遍历所有连接,如果某个连接在heartbeat_idle_time时间内没有数据发送,则强行关闭该连接。默认设置为heartbeat_check_interval*2。2、下发任务时,如何在短时间内在同一个链路上多次下发数据,可能会出现数据粘连的问题。解决方法如下。客户端发送时,拼接"\r\n",例如$this->client->send($data."\r\n");新增server端配置1.自动分包$server->set(array('open_eof_split'=>true,//swoole底层实现自动分包。比较cpu资源消耗'package_eof'=>"\r\n",//设置后缀,一般为"\r\n"));or2.手动分包$server->set(array('open_eof_check'=>true,//开启EOF检测,每次以EOF结尾的包都会发送给服务器'package_eof'=>"\r\n",//设置EOF))需要在应用层代码中手动explode("\r\n",$data)拆分数据包效率更高。如果效率更重要,请使用手动分包。您可以根据场景选择合适的。4.截图中swoole异步任务的执行效率很高。在数据量很大的情况下,效率还是很高的。swoole的异步任务进程池效率如何?正好手头有一个项目,使用swoole的异步任务,每天异步处理6000w新数据,耗时6小时左右,速度瓶颈其实卡在了mysql层面。学完以后多使用swoole异步任务,提高自己的接口响应效率。如果觉得对你有帮助,请给我一个赞,鼓励我有更多的动力继续写文章。文章来源:叶宇文玲的思考文章(https://segmentfault.com/u/ye...)