在一般的Server程序中都会有一些耗时的任务,比如:发送邮件,聊天服务器发送广播等,如果我们使用同步阻塞防水来执行这些任务,那么这肯定会很慢。Swoole的TaskWorker进程池可以用来执行一些异步任务而不影响接下来的任务,非常适合处理上面的场景。那么什么是异步任务呢?从下图可以很容易理解。(来自网络,侵删)我们上一篇Swoole文章介绍了如何搭建一个简单的服务器,知道了几个核心回调函数的使用方法。要实现上述异步处理,只需要添加两个事件回调:onTask和onFinish,这两个回调函数分别用于执行Task任务和处理Task任务的返回结果。另外需要在set方法中设置任务进程数。使用示例:classServer{private$serv;publicfunction__construct(){$this->serv=newswoole_server("0.0.0.0",9501);$this->serv->set(array('worker_num'=>4,'daemonize'=>false,'task_worker_num'=>8));$this->serv->on('开始',array($this,'onStart'));$this->serv->on('Connect',array($this,'onConnect'));$this->serv->on('Receive',array($this,'onReceive'));$this->serv->on('Close',array($this,'onClose'));$this->serv->on('Task',array($this,'onTask'));$this->serv->on('Finish',array($this,'onFinish'));$this->serv->start();}publicfunctiononReceive(swoole_server$serv,$fd,$from_id,$data){echo"GetMessageFromClient{$fd}:{$data}\n";//发送任务到任务进程$param=array('fd'=>$fd);$serv->task(json_encode($参数));echo"继续处理逻辑\n";}publicfunctiononTask($serv,$task_id,$from_id,$data){echo"ThisTask{$task_id}fromWorker{$from_id}\n";echo"数据:{$data}\n";for($i=0;$i<5;$i++){睡眠(1);echo"任务{$task_id}处理{$i}次...\n";}$fd=json_decode($data,true)['fd'];$serv->send($fd,"任务{$task_id}中的数据");返回“任务{$task_id}的结果”;}publicfunctiononFinish($serv,$task_id,$data){echo"任务{$task_id}完成\n";echo"结果:{$data}\n";}publicfunctiononStart($serv){echo"ServerStart\n";}publicfunctiononConnect($serv,$fd,$from_id){echo"Client{$fd}connect\n";}publicfunctiononClose($serv,$fd,$from_id){echo"Client{$fd}关闭连接\n";}}$server=newServer();从上面的例子我们可以看出,发起一个异步任务只需要调用swoole_server的task方法可以发送后,会触发onTask回调,通过$task_id和$from_id可以处理不同进程的不同任务。最后可以通过返回字符串的方式将执行结果返回给Worker进程,Worker进程通过onFinish回调对结果进行处理。那么基于上面的代码,就可以实现异步操作mysql了。异步操作mysql比较适合以下场景:并发读写操作在时序上没有严格的关系,不影响主线程逻辑好处:增加并发,降低IO消耗对数据库的压力主要在于数量由mysql维护的连接数。需要建立相应数量的连接。采用长连接方式,进程中一直保持mysql连接,减少了创建连接的损失。可以通过swoole启动多个task进程,每个进程维护一个mysql长连接,这样也可以扩展到mysql连接池技术。还需要注意的是,如果mysql服务器检测到长时间没有查询,就会断开并回收资源,所以必须要有断开重连的机制。下面是一个简单的异步操作mysql的例子:还是上面的代码,我们只需要修改onReceive、onTask、onFinish这三个函数即可。类服务器{私人$serv;publicfunction__construct(){$this->serv=newswoole_server("0.0.0.0",9501);$this->serv->set(array('worker_num'=>4,'daemonize'=>false,'task_worker_num'=>8//任务进程数即为维护的MySQL连接数));$this->serv->on('开始',array($this,'onStart'));$this->serv->on('Connect',array($this,'onConnect'));$this->serv->on('Receive',array($this,'onReceive'));$this->serv->on('Close',array($this,'onClose'));$this->serv->on('Task',array($this,'onTask'));$this->serv->on('Finish',array($this,'onFinish'));$this->serv->start();}publicfunctiononReceive(swoole_server$serv,$fd,$from_id,$data){echo“收到数据”。$数据。PHP_EOL;//将任务发送给Task进程$param=array('sql'=>$data,//接收客户端发送的sql'fd'=>$fd);$serv->task(json_encode($param));//发送任务到任务echo"继续处理后的逻辑\n";}publicfunctiononTask($serv,$task_id,$from_id,$data){echo"ThisTask{$task_id}fromWorker{$from_id}\n";echo"recvSQL:{$data['sql']}\n";静态$链接=空;$sql=$data['sql'];$fd=$data['fd'];地狱:如果($link==null){$link=@mysqli_connect(“127.0.0.1”,“root”,“root”,“test”);}$result=$link->query($sql);if(!$result){//如果查询失败if(in_array(mysqli_errno($link),[2013,2006])){//如果错误码为2013或2006,则重新连接数据库,重新执行sql$链接=空;去死吧;}}if(preg_match("/^select/i",$sql)){//如果是select操作,直接返回关联数组$data=array();while($fetchResult=mysqli_fetch_assoc($result)){$data['data'][]=$fetch结果;}}else{//否则直接返回结果$data['data']=$result;}$data['status']="确定";$数据['fd']=$fd;$serv->finish(json_encode($data));}publicfunctiononFinish($serv,$task_id,$data){echo"任务{$task_id}完成\n";$result=json_decode($result,true);if($result['status']=='OK'){$this->serv->send($result['fd'],json_encode($result['data'])."\n");}else{$this->serv->send($result['fd'],$result);}}publicfunctiononStart($serv){echo"ServerStart\n";}publicfunctiononConnect($serv,$fd,$from_id){echo"Client{$fd}connect\n";}publicfunctiononClose($serv,$fd,$from_id){echo"Client{$fd}关闭连接\n";}}$server=newServer();上面的代码直接在onReceive中接收一个sql,然后直接发送给Task任务。这时候流程的下一步是跟着输出,也体现了异步性。然后onTask和onFinish分别用于向数据库发送sql和处理任务执行结果。参考链接:https://wiki.swoole.comhttp://rango.swoole.com/archi...
