注:此为MixPHPV1示例。邮件发送是一个很常见的需求。由于发送邮件的操作一般比较耗时,所以我们一般采用异步处理来提高用户体验。异步,我们通常使用消息队列来实现。由于缺乏多进程开发能力,传统的MVC框架通常使用同一个脚本多次执行生成多个进程。mixphp封装了专用于多进程开发的TaskExecutor。用户可以轻松开发出功能齐全、高可用的多进程进程应用。下面演示一个异步邮件发送系统的开发过程,涉及知识点:异步消息队列多进程守护如何使用消息队列实现异步PHP使用消息队列通常是使用中间件实现的,常用的消息中间件有:redisrabbitmqkafka这个time我们选择redis来实现异步邮件发送。redis数据类型中有一个list类型,可以实现消息队列。使用以下命令://Enqueue$redis->lpush($key,$data);//出列$data=$redis->rpop($key);//块出列$data=$redis->brpop($key,10);架构设计本例采用传统MVC框架传递邮件发送需求,MixPHP多进程执行发送Task。邮件发送库的选择以往我们一般使用框架提供的邮件发送库,或者下载网上其他用户分享的库。composer出现后,https://packagist.org/上有大量高质量的库,我们只需要选择一个最好的就可以了。在此示例中,选择了swiftmailer。由于发送任务是由MixPHP执行的,所以在MixPHP项目中安装了swiftmailer。在安装的项目根目录下执行以下命令:composerrequireswiftmailer/swiftmailerproducerdevelopment在发送邮件的需求中,producer指的是发送任务的交付方,这一方通常是一个接口或者一个网页,这部分不一定需要mixphp开发,TP,CI,YII都可以,只需要在界面或者网页中将任务信息post到消息队列中即可。在传统的MVC框架的controller中添加如下代码:通常框架中会安装一个类库来使用redis。本示例使用本机代码,易于理解。//连接$redis=new\Redis();if(!$redis->connect('127.0.0.1',6379)){thrownew\Exception('RedisConnectFailure');}$redis->auth('');$redis->select(0);//投递任务$data=['to'=>['***@qq.com'=>'Aname'],'body'=>'这里是消息本身','subject'=>'标题内容',];$redis->lpush('queue:email',serialize($data));通常在异步开发中,交付后会立即响应一条消息给用户,当然此时任务并没有执行。消费者开发在本例中,我们使用MixPHP的多进程开发工具TaskExecutor来完成这个需求。通常,常驻进程用于处理队列消费,所以我们使用TaskExecutor的TYPE_DAEMON类型和MODE_PUSH模式。TaskExecutor的MODE_PUSH模式有两个进程:左进程:负责从消息队列中取出任务数据,传递给中间进程。中间进程:负责执行邮件发送任务。PushCommand.php的代码如下:任务\左进程;usemix\task\TaskExecutor;/***推送模式示例*@author刘健*/classPushCommandextendsBaseCommand{//配置信息constHOST='smtpdm.aliyun.com';常量端口=465;constSECURITY='ssl';constUSERNAME='****@email.***.com';常量密码='****';//初始化事件publicfunctiononInitialize(){parent::onInitialize();//TODO:更改自动生成的存根//获取程序名称$this->programName=Input::getCommandName();//设置pidfile$this->pidFile="/var/run/{$this->programName}.pid";}/***获取服务*@returnTaskExecutor*/publicfunctiongetTaskService(){returncreate_object([//类路径'class'=>'mix\task\TaskExecutor',//服务名称'name'=>"mix-daemon:{$this->programName}",//执行类型'type'=>\mix\task\TaskExecutor::TYPE_DAEMON,//执行模式'mode'=>\mix\task\TaskExecutor::MODE_PUSH,//左进程号'leftProcess'=>1,//中间进程号'centerProcess'=>5,//任务超时(秒)'timeout'=>5,]);}//开始publicfunctionactionStart(){//预处理if(!parent::actionStart()){returnExitCode::UNSPECIFIED_ERROR;}//启动服务$service=$this->getTaskService();$service->on('LeftStart',[$this,'onLeftStart']);$service->on('CenterStart',[$this,'onCenterStart']);$服务->开始();//返回退出码returnExitCode::OK;}//左进程启动事件回调函数publicfunctiononLeftStart(LeftProcess$worker){try{//模型中使用了数据库组件的长连接版本,这样组件会自动帮你维护连接$queueModel=Redis::getInstance();//保持任务执行状态,循环结束后,当前进程退出,主进程重新启动一个新的进程继续执行任务。这样做是为了避免长时间内存溢出。for($j=0;$j<16000;$j++){//从消息队列中间件块获取消息$data=$queueModel->brpop('queue:email',10);如果(空($data)){继续;}列表(,$数据)=$数据;//把消息Push到中进程处理,push有长度限制(https://wiki.swoole.com/wiki/page/290.html)$worker->push($data,false);}}catch(\Exception$e){//休息一下以避免100%CPU睡眠(1);//抛出一个错误throw$e;}}//中间进程启动事件回调函数publicfunctiononCenterStart(CenterProcess$worker){//保持任务执行状态,循环结束后,当前进程退出,主进程重新启动一个新进程继续执行任务。这样做是为了避免$j=0;$的长期内存溢出j<16000;$j++){//从进程消息队列中抓取一条消息$data=$worker->pop();如果(空($data)){继续;}//处理消息try{//处理消息,如:发短信,发邮件,微信推送var_dump($data);$ret=self::sendEmail($data);变量转储($ret);}catch(\Exception$e){//回滚数据到消息队列$worker->rollback($data);//休息一下,避免100%CPU睡眠(1);//抛出一个错误throw$e;}}}//发送邮件publicstaticfunctionsendEmail($data){//创建传输$transport=(new\Swift_SmtpTransport(self::HOST,self::PORT,self::SECURITY))->setUsername(self::USERNAME)->setPassword(self::PASSWORD);//使用你创建的Transport创建Mailer$mailer=new\Swift_Mailer($transport);//创建消息$message=(new\Swift_Message($data['subject']))->setFrom([self::USERNAME=>'**web'])->setTo($data['to'])->setBody($data['body']);//发送消息$result=$mailer->send($message);返回$结果;}}测试启动shell中的push驻留程序[root@localhostbin]#./mix-daemonpushstartmix-daemon'push'启动成功。调用接口发送任务到消息队列。此时shell终端会打印:成功收到测试邮件:MixPHPGitHub:https://github.com/mixstart/m...官网:http://www.mixphp.cn/