当前位置: 首页 > 后端技术 > PHP

MixPHPV2实例:协程池异步邮件发送守护进程

时间:2023-03-29 23:09:49 PHP

去年MixPHPV1发布时,我写了一个多进程邮件发送实例:使用mixphp创建多进程异步邮件发送,今年MixPHPV2是发布了,全面的协程支持让我们可以用一个进程实现以前多个进程无法实现的更高的IO性能,所以今天我们重写了一个协程池版本的邮件发送实例。发送电子邮件是一个非常常见的需求。由于发送邮件的操作一般都是比较耗时的,所以我们一般会采用异步处理来提升用户体验,而异步通常是使用消息队列来实现的。下面演示一个异步邮件发送系统的开发过程,涉及知识点:如何使用消息队列守护进程协程池实现异步PHP使用消息队列通常是使用中间件实现的,常用的消息中间件有:redisrabbitmqkafka这个这次我们选择Redis实现异步邮件发送。Redis数据类型中有一个列表类型,可以实现消息队列。使用以下命令://Enqueue$redis->lpush($key,$data);//出列$data=$redis->rpop($key);//块出列$data=$redis->brpop($key,10);架构设计本例采用传统的MVC框架来满足邮件发送需求(生产者),由MixPHP编写的守护进程执行发送任务(消费者)。邮件发送库的选择以往我们一般使用框架提供的邮件发送库,或者下载网上其他用户分享的库。composer出现后,https://packagist.org/上有大量高质量的库,我们只需要选择一个最好的就可以了。在此示例中,选择了swiftmailer。由于发送任务是由MixPHP执行的,所以在MixPHP项目中安装了swiftmailer。在安装的项目根目录下执行如下命令:composerrequireswiftmailer/swiftmailerproducerdevelopment在邮件发送的需求中,producer指的是delivery发送任务这部分一般是界面或者网页。这部分不一定需要MixPHP开发。TP、CI、YII都可用。您只需要将任务信息发布到界面或网页中的消息队列中即可。在传统的MVC框架的controller中添加如下代码:通常会在框架中安装一个类库来使用Redis。本示例使用本机代码,易于理解。//connect$redis=new\Redis();if(!$redis->connect('127.0.0.1',6379)){thrownew\Exception('Redisconnectfailed.');}$redis->auth('');$redis->select(0);//投递任务$data=['to'=>'***@qq.com','body'=>'消息内容','主题'=>'标题内容',];$redis->lpush('queue:email',serialize($data));通常在异步开发中,发送完成后会立即发送消息给用户。当然,这时候这个任务并不是在生产者中执行的,而是在消费者获取到消息后执行的。消费者在开发使用本示例时,请确保您使用的Swoole是启用openssl编译的。本例中我们使用MixPHPV2的守护进程和协程池来完成一个超高性能的邮件发送程序。因为我们开发的是daemon,所以我们在applications/daemon模块中开发,首先我们在配置applications/daemon/config/main.php中注册一个命令://command'commands'=>['mailer'=>['Mailer','description'=>'Mailerdaemon.'],],注册命令中指定的Mailer命令类,接下来我们写一个MailerCommand类:applications/daemon/src/Commands/MailerCommand.php*/classMailerCommand{/***exit*@varbool*/public$quit=false;/***mainfunction*/publicfunctionmain(){//捕获信号ProcessHelper::signal([SIGHUP,SIGINT,SIGTERM,SIGQUIT],function($signal){$this->quit=true;ProcessHelper::信号([SIGHUP,SIGINT,SIGTERM,SIGQUIT],空);});//协程池执行任务xgo(function(){$maxWorkers=20;$maxQueue=20;$jobQueue=newChannel($maxQueue);$dispatch=newDispatcher(['jobQueue'=>$jobQueue,'maxWorkers'=>$maxWorkers,]);$dispatch->start(MailerWorker::class);//投递任务$redis=app()->redisPool->getConnection();while(true){if($this->quit){$dispatch->stop();return;}try{$data=$redis->brPop(['queue:email'],3);}catch(\Throwable$e){$dispatch->stop();return;}if(!$data){continue;}$data=array_pop($data);//brPop命令的最后一个键是值$jobQueue->push($data);}});}}从$data=$redis->brPop(['queue:email'],3);外部异常捕获,我们可以知道当Redis连接出现错误,比如Redis重启,连接异常,协程池会安全退出,也就是说当进程异常退出时,用户需要使用supervisor,pm2等重启守护进程的工具以上是一段使用MixPHP协程池的代码,基本可以直接复制使用。框架默认包含协程池的demo。本例只修改协程池的Worker。这条命令主要是完成从Redis队列中获取消息,然后推送到jobQueue中,jobQueue中的数据会被20个Worker实例中的一个抢占,然后并行执行。本例中邮件发送代码的逻辑在MailerWorker类中:applications/daemon/src/Libraries/MailerWorker.php*/classMailerWorkerextendsAbstractWorkerimplementsWorkerInterface{/***Mailer*@varMailer*/public$邮件程序;/***初始化事件*/publicfunctiononInitialize(){parent::onInitialize();//TODO:更改自动生成的存根//实例化一些可重用对象$this->mailer=newMailer();}/***handle*@param$data*/publicfunctionhandle($data){//TODO:实现handle()方法。$数据=反序列化($d阿塔);如果(空($data)){返回;}try{$this->mailer->send($data['to'],$data['subject'],$data['body']);app()->log->info("邮件发送成功:给{to}主题{subject}",$data);}catch(\Throwable$e){app()->log->error("邮件发送失败:给{to}subject{subject}error{error}",array_merge($data,['error'=>$e->getMessage()]));}}}从上面的代码可以看出,Worker在初始化的时候,加入了Mailer类的一个属性。当jobQueue消息被传递时,消息将传递给handle方法。在这个方法中,使用了Mailer类的实例来完成邮件发送任务,所以我们要写一个Mailer发送程序:applications/daemon/src/Libraries/Mailer.php*/classMailer{/***配置信息*/constHOST='smtpdm.aliyun.com';常量端口=465;constSECURITY='ssl';const用户名='***';常量密码='***';/***邮件构造函数。*/publicfunction__construct(){//启用协程钩子Coroutine::enableHook();}/***Send*@param$to*@param$subject*@param$body*@returnint*/publicfunctionsend($to,$subject,$body){//创建传输$transport=(new\Swift_SmtpTransport(self::HOST,self::PORT,self::SECURITY))->setUsername(self::USERNAME)->setPassword(self::PASSWORD);//使用你创建的Transport创建Mailer$mailer=new\Swift_Mailer($transport);//创建消息$message=(new\Swift_Message($subject))->setFrom([self::USERNAME=>'**net'])->setTo($to)->setBody($body);//发送消息return$mailer->send($message);}}在Mailer发送程序中,我们使用之前composer安装的swiftmailer库来发送邮件。以上完成了所有的代码逻辑,现在我们开始测试先启动消费者守护进程:[root@localhostbin]#./mix-daemonmailer将上面的生产者脚本命名为push.php,在CLI中执行(新开一个终端):[root@localhostbin]#php/tmp/push.phpconsumerdaemonresult:[root@localhostbin]#./mix-daemonmailer[info]2019-04-1511:48:36[message]邮件发送成功:to***@qq.comsubject标题内容命令行终端打印发送成功的日志,发送完成。