Flask是一个用Python编写的轻量级Web应用程序框架。之所以称为“微框架”,是因为它使用简单的核心,并使用扩展来添加其他功能,例如:ORM、表单验证工具、文件上传、各种开放式身份验证技术等等。MQTT是一种基于发布/订阅模型的轻量级物联网消息传输协议。它可以用很少的代码和带宽为联网设备提供实时可靠的消息服务。广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。本文主要介绍如何在Flask项目中实现MQTT客户端与MQTT服务器的连接、订阅、取消订阅、收发消息等功能。我们将使用Flask-MQTT客户端库,这是一个Flask扩展,可以看作是一个paho-mqtt装饰器,以简化Flask应用程序中的MQTT集成。项目初始化本项目使用Python3.8进行开发和测试。读者可以使用以下命令来确认Python的版本。$python3--versionPython3.8.2使用pip安装Flask-MQTT库pip3installflask-mqttFlask-MQTT使用本文将使用EMQ提供的免费公共MQTT服务器,基于MQTT云服务-EMQXCloud创建.服务器访问信息如下:Broker:broker.emqx.ioTCP端口:1883Websocket端口:8083ImportFlask-MQTT导入Flask库和Flask-MQTT扩展,创建Flask应用fromflaskimportFlask,request,jsonifyfromflask_mqttimportMqttaapp=Flask(__name__)配置Flask-MQTT扩展app.config['MQTT_BROKER_URL']='broker.emqx.io'app.config['MQTT_BROKER_PORT']=1883app.config['MQTT_USERNAME']=''#当需要验证用户名和密码时,请设置此项app.config['MQTT_PASSWORD']=''#当需要验证用户名和密码时,请设置此项app.config['MQTT_KEEPALIVE']=5#以秒为单位设置心跳时间app.config['MQTT_TLS_ENABLED']=False#如果你的服务器支持TLS,请设置为Truetopic='/flask/mqtt'mqtt_client=Mqtt(app)完整的配置项,请参考Flask-MQTT配置文档。编写连接回调函数,可以在回调函数中处理MQTT连接成功或失败。本例会在连接成功后订阅/flask/mqtt主题。@mqtt_client.on_connect()defhandle_connect(client,userdata,flags,rc):ifrc==0:print('Connectedsuccessfully')mqtt_client.subscribe(topic)#订阅主题else:print('Badconnection.Code:',rc)编写消息回调函数,打印/flask/mqtt主题收到的消息。@mqtt_client.on_message()defhandle_mqtt_message(client,userdata,message):data=dict(topic=message.topic,payload=message.payload.decode())print('收到关于主题的消息:{topic}withpayload:{payload}'.format(**data))创建消息发布接口我们创建一个简单的POST接口来实现MQTT消息发布。在实际应用中,这个接口可能需要一些更复杂的业务逻辑处理。@app.route('/publish',methods=['POST'])defpublish_message():request_data=request.get_json()publish_result=mqtt_client.publish(request_data['topic'],request_data['msg'])returnjsonify({'code':publish_result[0]})运行Flask应用程序if__name__=='__main__':app.run(host='127.0.0.1',port=5000)当Flask应用程序启动时,MQTT客户端将连接到服务器并订阅主题/flask/mqtt。测试接下来,我们使用MQTT客户端——MQTTX进行连接、订阅、发布测试。测试消息接收在MQTTX中创建链接并连接到服务器。将MQTTX的消息Hello发布到MQTTX中的主题/flask/mqtt,在Flask运行窗口中,可以看到MQTTX发送的消息,测试消息发布接口订阅/flask/mqtt主题在MQTTX中使用Postman调用/publish接口:从Flask向/flask/mqtt主题发送消息Hello。在MQTTX中,您将能够看到Flask发送的消息。完整代码fromflaskimportFlask,request,jsonifyfromflask_mqttimportMqttaapp=Flask(__name__)app.config['MQTT_BROKER_URL']='broker.emqx.io'app.config['MQTT_BROKER_PORT']=1883app.config['MQTT_USERNAME']=''#当您需要验证用户名和密码时,请设置此项app.config['MQTT_PASSWORD']=''#当您需要验证用户名和密码时,请设置此项app.config['MQTT_KEEPALIVE']=5#以秒为单位设置心跳时间app.config['MQTT_TLS_ENABLED']=False#如果您的服务器支持TLS,请将其设置为Truetopic='/flask/mqtt'mqtt_client=Mqtt(app)@mqtt_client.on_connect()defhandle_connect(client,userdata,flags,rc):如果rc==0:print('连接成功')mqtt_client.subscribe(topic)else:print('Badconnection.Code:',rc)@mqtt_client.on_message()defhandle_mqtt_message(client,userdata,message):data=dict(topic=message.topic,payload=message.payload.decode())print('收到关于主题的消息:{topic}withpayload:{payload}'.format(**data))@app.route('/publish',methods=['POST'])defpublish_message():request_data=request.get_json()publish_result=mqtt_client.publish(request_data['topic'],request_data['msg'])returnjsonify({'code':publish_result[0]})如果__name__=='__main__':app.run(host='127.0.0.1',port=5000)注解Flask-MQTT目前不适合使用多个工作实例,如果你需要使用像gevent或gunicorn这样的WSGI服务器,请确保只有一个工作实例我们使用Flask-MQTT完成了一个简单的MQTT客户端,可以在Flask应用中订阅和发布消息。版权声明:本文为EMQ原创,转载请注明出处。
