MQTT是一种基于发布/订阅(publish/subscribe)模型的“轻量级”通信协议。作为一种低开销、低带宽的即时通讯协议,它已经成为物联网的重要组成部分。Swoole还提供了PHP开发IoT项目的能力,只需要设置一个open_mqtt_protocol选项即可。开启后,会解析MQTT包头。Worker进程的onReceive事件每次都会返回一个完整的MQTT包。当然还有其他的,比如Workerman提供的异步协议。mqtt客户端库等开源库在此不做介绍。Simps的第一版MQTT库参考了Workerman的实现,使其能够使用Swoole的协程能力,并修复了一些问题。还要感谢@walkor对PHP生态的贡献。第一版的实现放在框架中,限制了部分用户的使用。于是重构又开始了,让MQTT独立成为一个库,方便用户使用,丰富PHP生态,让PHP程序员不再局限于Web开发。很少有用户询问MQTT,Swoole也修复了一些相关的bug。现在用PHP+Swoole来开发物联网相关的项目应该是如虎添翼。同时第一个版本的MQTT库只支持MQTT3.x,不支持MQTT5.0,在GitHub上也没有找到相关的支持类库,所以重构3.x版本后也支持MQTT5.0。也许这是第一个支持MQTTv5.0协议的PHP库...支持MQTT协议版本3.1、3.1.1和5.0支持QoS0、QoS1、QoS2,那就到这里了,使用composer安装composerrequiresimps/mqtt安装成功后,我们来看看订阅和发布的使用,以MQTT5.0为例。订阅首先应该是订阅。订阅成功后,即可收到相应主题的发布消息。创建一个subscribe.php并写入以下内容include__DIR__。'/vendor/autoload.php';使用Simps\MQTT\Hex\ReasonCode;使用Swoole\Coroutine;useSimps\MQTT\Client;useSimps\MQTT\Types;$config=['host'=>'broker.emqx.io','port'=>1883,'time_out'=>5,'user_name'=>'user001','password'=>'hLXQ9ubnZGzkzf','client_id'=>Client::genClientID(),'keep_alive'=>10,'properties'=>['session_expiry_interval'=>60,'receive_maximum'=>200,'topic_alias_maximum'=>200,],'protocol_level'=>5,];Coroutine\run(function()use($config){$client=newClient($config,['open_mqtt_protocol'=>true,'package_max_length'=>2*1024*1024]);while(!$data=$client->connect()){Coroutine::sleep(3);$client->connect();}$topics['simps-mqtt/user001/get']=['qos'=>1,'no_local'=>true,'retain_as_published'=>true,'retain_handling'=>2,];$timeSincePing=time();$res=$client->subscribe($top集成电路);//订阅的结果var_dump($res);while(true){$buffer=$client->recv();如果($buffer&&$buffer!==true){$timeSincePing=time();//接收到的数据包var_dump($buffer);}if(isset($config['keep_alive'])&&$timeSincePing<(time()-$config['keep_alive'])){$buffer=$client->ping();if($buffer){echo'发送ping成功'.PHP_EOL;$timeSincePing=时间();}else{$client->close();休息;}}//QoS1发布回复if($buffer['type']===Types::PUBLISH&&$buffer['qos']===1){$client->send(['type'=>Types::PUBACK,'message_id'=>$buffer['message_id'],'code'=>ReasonCode::SUCCESS]);}}});执行phpsubscribe.php,你会得到这个输出array(3){["type"]=>int(9)["message_id"]=>int(1)["codes"]=>array(1){[0]=>int(1)}}表示订阅成功,代码对应相应订阅主题的QoS等级。发布订阅成功后,创建一个publish.php测试发布include__DIR__。'/vendor/autoload.php';使用Swoole\Coroutine;使用Simps\MQTT\Client;$config=['host'=>'broker.emqx.io','port'=>1883,'time_out'=>5,'user_name'=>'user002','password'=>'adIJS1D482sd','client_id'=>Client::genClientID(),'keep_alive'=>20,'properties'=>['session_expiry_interval'=>60,'receive_maximum'=>200,'topic_alias_maximum'=>200,],'protocol_level'=>5,];Coroutine\run(function()use($config){$client=newClient($config,['open_mqtt_protocol'=>true,'package_max_length'=>2*1024*1024]);while(!$client->connect()){Coroutine::sleep(3);$client->connect();}while(true){$response=$client->publish('simps-mqtt/user001/get','{"time":'.time().'}',1,0,0,['topic_alias'=>1]);var_dump($响应);协程::睡眠(3);}});代码意思是每3秒向订阅主题simps-mqtt/user001/get发布一条消息,并打开一个新的终端窗口,执行phppublish.php,你会得到输出:array(4){["type"]=>int(4)["message_id"]=>int(1)["code"]=>int(0)["message"]=>string(7)"Success"}这里,添加了消息。为了用户的可读性,不需要去寻找对应的代码含义。返回订阅窗口,会看到打印出来的发布信息数组(8){["type"]=>int(3)["topic"]=>string(0)""["message"]=>string(19)"{"time":1608017156}"["dup"]=>int(1)["qos"]=>int(1)["retain"]=>int(0)["message_id"]=>int(4)["properties"]=>array(1){["topic_alias"]=>int(1)}}这么简单的发布订阅功能就实现了。这个库还有一些未完成的部分值得优化,比如Auth不支持MQTT5Type,还有一些属性暂时还不支持。想参加的同学可以提交PR。如果您有任何问题,您也可以提交Issue。让我们携手共建PHP生态仓库地址:simps/mqtt,记得点个Star支持一下
