当前位置: 首页 > 后端技术 > Python

PythonMQTT异步框架——HBMQTT

时间:2023-03-26 01:48:47 Python

什么是异步?CPU的速度比磁盘、网络等IO操作要快很多。在一个线程中,无论CPU执行多快,遇到IO操作都必须停下来等待读写完成,这无疑浪费了大量的时间。为了解决这个问题,Python加入了异步IO的特性。在Python3.4中,asyncio被正式纳入标准库,在Python3.5中,增加了async/await关键字。用户可以很容易地在函数前添加async关键字来使函数异步。在Python的MQTT客户端库中,HBMQTT是最早支持异步IO的PythonMQTT库。HBMQTT库HBMQTT是一个基于Python编写的开源库。它实现了MQTT3.1.1协议。其特点如下:支持QoS0、QoS1和QoS2消息客户端自动重连支持TCP和WebSocket支持SSL支持插件系统接下来我们将演示如何使用PythonMQTT异步框架-HBMQTT轻松实现一个异步demo具有MQTT发布和订阅功能。项目初始化确定Python版本本项目使用Python3.6进行开发和测试。读者可以使用以下命令确认Python版本。因为需要使用async关键字,所以需要保证Python版本不低于Python3.5?~python3--versionPython3.6.7使用Pip安装HBMQTT库Pip是一个Python包管理工具,它提供Python包搜索、下载、安装和卸载功能。pip3install-ihttps://pypi.doubanio.com/simplehbmqtt连接MQTT服务器本文将使用EMQX提供的免费公共MQTT服务器,该服务器基于EMQX的MQTTIoT云平台打造。服务器访问信息如下:Broker:broker.emqx.ioTCP端口:1883Websocket端口:8083首先导入MQTT客户端库。fromhbmqtt.clientimportMQTTClientclient=MQTTClient()#connectserverclient.connect('mqtt://broker.emqx.io/')#disconnectclient.disconnect()异步写法如下:asyncdeftest_pub():client=MQTTClient()awaitclient.connect('mqtt://broker.emqx.io/')awaitclient.disconnect()publishmessage发布函数是MQTTClient类的发布函数。client=MQTTClient()#函数的三个参数分别是主题,消息内容,QoSclient.publish('a/b',b'TESTMESSAGEWITHQOS_0',qos=QOS_0)异步写法如下:asyncdeftest_pub():client=MQTTClient()awaitClient.connect('mqtt://broker.emqx.io/')awaitasyncio.gather(client.publish('a/b',b'TESTMESSAGEWITHQOS_0',qos=QOS_0),client.publish('a/b',b'使用QOS_1测试消息',qos=QOS_1),client.publish('a/b',b'使用QOS_2测试消息',qos=QOS_2))logging.info("messagespublished")awaitClient.disconnect()在这段代码中,我们将三个发送消息函数放到了asyncio的任务列表中,它们会依次执行。当所有任务完成后,断开连接。订阅消息订阅消息的功能是MQTTClient类中的subscribe函数。client=MQTTClient()#订阅client.subscribe([('topic/0',QOS_0),('topic/1',QOS_1),])#取消订阅client.unsubscribe([('topic/0',QOS_0),]异步写法如下:asyncdeftest_sub():client=MQTTClient()awaitclient.connect('mqtt://broker.emqx.io/')awaitclient.subscribe([('a/b',QOS_1),])foriinrange(0,10):message=awaitclient.deliver_message()packet=message.publish_packetprint(f"{i}:{packet.variable_header.topic_name}=>{packet.payload.data}")awaitclient.disconnect()在这段代码中,我们设置了await在接收到消息时等待,当代码执行到如下位置时,CPU会先执行其他任务,直到有消息送达,然后打印.message=awaitclient.deliver_message()最后,程序会等待10条消息被接收,然后关闭连接。完整代码消息订阅代码#sub.py#python3.6+importasyncioimportloggingfromhbmqtt.clientimportMQTTClientfromhbmqtt.mqtt.constantsimportQOS_1asyncdeftest_sub():client=MQTTClient()awaitclient.connect('mqtt://broker.emqx.io/')awaitclient.subscribe([('a/b',QOS_1),])foriinrange(0,10):message=awaitclient.deliver_message()packet=message.publish_packetprint(f"{i}:{packet.variable_header.topic_name}=>{packet.payload.data}")awaitclient.disconnect()if__name__=='__main__':formatter="[%(asctime)s]%(name)s{%(filename)s:%(lineno)d}%(levelname)s-%(message)s"logging.basicConfig(level=logging.INFO,format=formatter)asyncio.run(test_sub())消息发布代码#pub.py#python3.6+importasyncioimportloggingfromhbmqtt.clientimportMQTTClientfromhbmqtt.mqtt.constantsimportQOS_0,QOS_1,QOS_2asyncdeftest_pub():client=MQTTClient()awaitclient.connect('mqtt://broker.emqx.io/')awaitasyncio.gather(client.publish('a/b',b'使用QOS_0测试消息',qos=QOS_0),client.publish('a/b',b'使用QOS_1测试消息',qos=QOS_1),client.publish('a/b',b'TESTMESSAGEWITHQOS_2',qos=QOS_2))logging.info("messagespublished")awaitclient.disconnect()if__name__=='__main__':formatter="[%(asctime)s]%(name)s{%(filename)s:%(lineno)d}%(levelname)s-%(message)s"logging.basicConfig(level=logging.INFO,format=formatter)asyncio.run(test_pub())测试消息发布运行MQTT消息发布代码,我们会看到客户端连接成功并成功发布消息如下:MQTTX客户端成功接收到由客户端发布的消息HBMQTTclient:Messagesubscription运行MQTT消息订阅代码,我们会看到客户端连接成功,客户端正在等待消息进入。使用MQTTX客户端连接broker.emqx.io,然后向主题a/b发送10条消息返回Terminal,我们看到客户端接收并打印消息,收到10条消息后主动退出程序。综上所述,我们完成了HBMQTT库与公共MQTT服务器的连接,实现了测试客户端与MQTT服务器的连接、消息发布与订阅。通过使用Python异步IO来收发消息,可以帮助我们实现更高效的MQTT客户端。接下来我们会发布更多关于物联网开发和Python的文章,敬请期待。版权声明:本文为EMQ原创,转载请注明出处。