前段时间我们发布了MixPHPV2实例:协程池异步邮件发送守护实例,这次我们通过SwooleHook提供了一个大厂SDK,用于执行短信发送任务并行。本文是一个代码简单,IO性能强的例子。请先升级到mix-framework>=v2.0.5。本例仍然使用消息队列接收短信发送任务。消息中间件的使用:redis生产者通常在框架中使用Redis,并安装一个类库来使用。本示例使用本机代码以便于理解。//connect$redis=new\Redis();if(!$redis->connect('127.0.0.1',6379)){thrownew\Exception('Redisconnectfailed.');}$redis->auth('');$redis->select(0);//投递任务for($i=0;$i<3;$i++){$data=['phone'=>'***','templateCode'=>'SMS_***','templateParam'=>['code'=>123456],];$redis->lpush('queue:sms',serialize($data));}消费者使用的是阿里云的短信服务。查看官方PHPSDK文档。使用的库为:composerrequirealibabacloud/client通过查看库的composer依赖文件得知该库是基于guzzlehttp开发的,因为MixPHP提供了无需修改代码的HookGuzzle库可以在协程工具中使用混合PHPV2生态:让Guzzle支持Swoole的Hook协程,所以基本可以确定该库可以在Swoole协程中使用。首先我们安装https://github.com/mix-php/guzzle-hook以便在协程中可以使用alibabacloud/client:composerrequiremix/guzzle-hook然后在composer.json文件中添加额外的配置项项目的,如下:"extra":{"include_files":["vendor/mix/guzzle-hook/src/functions_include.php"]}更新自动加载:composerdump-autoload下面我们使用MixPHPV2守护进程和协程池完成一个超高性能的短信发送程序。首先,我们在配置applications/console/config/main.php中注册一个命令://command'commands'=>['smser'=>['Smser','description'=>"SMSsenddaemondemo.",'options'=>[[['d','daemon'],'description'=>'后台运行'],],],],注册命令中指定的smser命令类,接下来我们写SmserCommand类:applications/console/src/Commands/SmserCommand.php*/classSmserCommand{constACCESS_KEY='***';constACCESS_SECRET='***';/***退出*@varbool*/public$quit=false;/***Mainfunction*/publicfunctionmain(){//守护进程处理$daemon=Flag::bool(['d','daemon'],false);如果($daemon){ProcessHelper::daemon();}//捕捉信号ProcessHelper::signal([SIGHUP,SIGINT,SIGTERM,SIGQUIT],function($signal){$this->quit=true;ProcessHelper::signal([SIGHUP,SIGINT,SIGTERM,SIGQUIT],null);});//设置阿里云全局参数AlibabaCloud::accessKeyClient(static::ACCESS_KEY,static::ACCESS_SECRET)->regionId('cn-hangzhou')->asDefaultClient();//手动关闭Swoole文件Hook,因为阿里云依赖的uuid库存在filehook协程兼容问题,Swoole4.4已经适配了这个问题Coroutine::enableHook(SWOOLE_HOOK_ALL^SWOOLE_HOOK_FILE);//协程池执行任务xgo(function(){$maxWorkers=20;$maxQueue=20;$jobQueue=newChannel($maxQueue);$dispatch=newDispatcher(['jobQueue'=>$jobQueue,'maxWorkers'=>;$maxWorkers,]);$dispatch->start(SmserWorker::class);//调度任务$redis=app()->redisPool->getConnection();while(true){if($this->quit){$dispatch->stop();返回;}尝试{$data=$redis->brPop(['queue:sms'],3);}catch(\Throwable$e){$dispatch->stop();返回;}如果(!$数据){继续;}$data=array_pop($data);//brPop命令的最后一个键是值$jobQueue->push($data);}});//等待事件Event::wait();}}从$data=$redis->brPop(['queue:sms'],3);外部异常捕获,我们可以知道,当Redis连接失败,比如Redis重启,当连接异常时,协程池会安全退出,也就是说当进程异常退出时,用户需要使用supervisor,pm2等工具重启守护进程以上是一段使用MixPHP协程池的代码,基本可以直接复制使用。框架默认包含协程池的demo。本例只修改协程池的Worker。这条命令主要是完成从Redis队列中获取消息然后推送到jobQueue中,jobQueue中的数据会被20个Worker实例中的一个实例抢占并并行执行。本例的发送代码逻辑在SmserWorker类中:applications/console/src/Libraries/SmserWorker.php*/classSmserWorkerextendsAbstractWorkerimplementsWorkerInterface{/***mailer*@varSmser*/public$smser;/***初始化事件*/publicfunctiononInitialize(){parent::onInitialize();//TODO:更改自动生成的存根//实例化一些需要重用的对象$this->smser=newSmser();}/***handle*@param$data*/publicfunctionhandle($data){//TODO:实现handle()方法。$数据=反序列化($数据);如果(空($data)){返回;}try{$result=$this->smser->send($data['phone'],$data['templateCode'],$data['templateParam']);app()->log->info("短信发送成功:电话{phone}templateCode{templateCode}结果{result}",array_merge($data,['result'=>json_encode($result,JSON_UNESCAPED_UNICODE)]));}catch(\Throwable$e){app()->log->error("短信发送失败:电话{phone}templateCode{templateCode}error{error}",array_merge($data,['error'=>$e->getMessage()]));}}}从上面的代码可以看出,在初始化Worker的时候,新增了一个Smser类的属性。当jobQueue消息被传递时,消息将传递给handle方法。方法中使用了Mailer类的实例来完成邮件发送任务,所以我们要写一个Smser发送程序:applications/console/src/Libraries/Smser.php*/classSmser{/***配置信息*/constSIGN_NAME='***';/***Smser构造函数。*/publicfunction__construct(){//启动协程钩子Coroutine::enableHook();}/***发送*@param$phone*@param$templateCode*@param$templateParam*@returnarray*@throwsClientException*@throwsServerException*/publicfunctionsend($phone,$templateCode,$templateParam){$result=AlibabaCloud::rpc()->product('Dysmsapi')//->scheme('https')//https|http->version('2017-05-25')->action('SendSms')->method('POST')->options(['query'=>['PhoneNumbers'=>$phone,'SignName'=>static::SIGN_NAME,'TemplateCode'=>$templateCode,'TemplateParam'=>json_encode($templateParam),],])->request();返回$result->toArray();}}以上就完成了所有的代码逻辑,现在我们开始测试,首先启动consumerdaemon:[root@localhostbin]#./mix-consolesmser将上面的producer脚本命名为push.php,在CLI中执行(打开一个新终端):[root@localhostbin]#php/tmp/push.phpconsumerdaemonresult:[root@localhostbin]#./mix-consolesmser[info]2019-05-2412:03:32<101014>[message]短信发送成功:phone***templateCodeSMS_***result{"Message":"OK","RequestId":"4071D031-6D9E-4F70-9269-6C1979080858","BizId":"939807358670612546^0","Code":"OK"}[info]2019-05-2412:03:32<101014>[message]短信发送成功:phone***templateCodeSMS_***result{"Message":"触发分钟级流控Permits:1","RequestId":"490B73D7-317E-4362-B2DD-5E2153A7B891","Code":"isv.BUSINESS_LIMIT_CONTROL"}[info]2019-05-2412:03:32<101014>[message]短信发送成功:phone***templateCodeSMS_***result{"Message":"Triggerminute-leveltrafficcontrolPermits:1","RequestId":"1FD22EDB-BAA4-4416-8FF9-242EDCF34359","Code":"isv.BUSINESS_LIMIT_CONTROL"}命令行终端打印发送成功日志,发送完成
