1.前言在上一篇物联网网关开发:基于MQTT消息总线的设计流程(上)中,我们谈到了物联网中系统的网关,如何使用MQTT消息总线实现嵌入式系统中多个进程之间的相互通信。这种通信模型最大的优点是:模块之间解耦;模块间并行开发;TCP链路和粘包问题交给消息总线处理,我们只需要业务层处理即可;调试方便;上面只是描述了嵌入式系统中进程间的通信方式,那么网关是如何与云平台进行交互的呢?上一篇文章提到:网关与云平台的通信方式一般只有客户指定的几种(阿里云、华为云、腾讯云、亚马逊AWS平台)。一般要求网关与云平台处于长期连接状态,以便随时向网关发送来自云端的各种指令。在这篇文章中,我们来谈谈这一部分。在回复:mqtt,获取示例代码的网盘地址。2、MQTT连接云平台目前主要的物联网云平台都提供了不同的接入方式。对于网关来说,MQTT接入是应用最为广泛的。我们知道MQTT只是一个协议,有不同的编程语言实现,C语言有几种实现。网关内部有一个后台守护进程:MQTTBroker,其实就是可执行程序mosquitto,起到消息总线的作用。这里要注意:因为这个消息总线运行在嵌入式系统内部,访问总线的客户端就是需要相互通信的进程。这些进程的数量是有限的,即使是再复杂的系统,最多十几个进程也差不多够了。因此,mosquitto的实现完全可以支撑系统负载。那么如果在云端部署一个MQTTBroker,理论上可以直接使用mosquitto实现作为消息总线,但是需要评估连接的客户端(也就是网关)的数量级,考虑并发问题,一定要做压力测试。对于后台开发,我经验不多,不敢(也不能)多说,误导大家就是犯罪。但是对于一般的学习和测试,直接把mosquitto部署在云端作为消息总线是没有问题的。3.Proc_Bridge进程:外部和内部消息总线之间的桥梁下图说明了Proc_Bridge进程在该模型中的作用:从云平台消息总线接收到的消息需要转发给内部消息总线;内部消息总线接收到的消息需要转发到云平台的消息总线;如果用mosquitto来实现,应该怎么实现呢?1.mosquitto的API接口mosquitto的实现是基于回调函数机制来运行的,例如://连接成功时的回调函数voidmy_connect_callback(structmosquitto*mosq,void*obj,intrc){//。..}//连接失败时的回调函数voidmy_disconnect_callback(structmosquitto*mosq,void*obj,intresult){//...}//回调函数voidmy_message_callback(structmosquitto*mosq,void*obj,conststructmosquitto_message*message){//..}intmain(){//其他代码//...//创建一个mosquitto对象结构mosquittog_mosq=mosquitto_new("client_name",true,NULL);//注册回调函数mosquitto_connect_callback_set(g_mosq,my_connect_callback);mosquitto_disconnect_callback_set(g_mosq,my_disconnect_callback);mosquitto_message_callback_set(g_mosq,my_message_callback);//这里还有其他的回复数Setup//开始连接消息总线mosquitto_connect(g_mosq,"127.0.0.1",1883,60);while(1){intrc=mosquitto_loop(g_mosq,-1,1);if(rc){printf("mqtt_portal:mosquitto_looprc=%d\n",rc);sleep(1);mosquitto_reconnect(g_mosq);}}mosquitto_destroy(g_mosq);mosquitto_lib_cleanup();return0;}以上代码是一个mosquitto客户端最简单的代码,使用回调函数机制,让程序的开发变得非常简单,mosquitto帮我们处理了底层的细节,只要我们注册的函数被调用,就代表我们感兴趣的事件发生了,各种回调机制使用的对比开源软件很多,比如:glib中的定时器,libevent中的通信处理,libmodbus中的数据处理,linux内核中的驱动开发和定时器,都是这个套路,都可以掌握!对于网关中的每一个进程,你只需要添加上面的部分代码就可以挂载到消息总线上,使其可以和其他进程收发数据2.使用UserData指针实现多个MQTT连接,上面的例子只连接了一个消息总线。对于ordinary对于进程来说,已经达到了通信的目的。但是对于Proc_Bridge进程来说,目的还没有达到,因为这个进程处于桥接的位置,需要同时连接到远程和本地的消息总线。那么应该如何实现呢??看mosquitto_new函数的签名:/**obj-Auserpointerthatwillbepassedasanargumenttoany*callbacksthatarespecified.*//*最后一个参数的作用是:可以设置一个用户自己的数据(作为指针传入),然后mosquitto在调用返回我们的注册此指针将传递给任何函数。因此,我们可以通过这个参数来区分连接是远程连接还是本地连接。*/libmosq_EXPORTstructmosquitto*mosquitto_new(constchar*id,boolclean_session,void*obj);所以,我们可以定义一个结构体变量,在这里记录一个MQTT连接的所有信息,然后注册到mosquitto。mosquitto在回调函数的时候,把这个结构体变量的指针传回给我们,这样我们就可以拿到连接的所有数据,某种程度上,这也是一种面向对象的思想。//结构体typedefstruct{char*id;char*name;char*pw;char*host;intport;pthread_ttHandle;structmosquitto*mosq;intmqtt_num;}一直表示一个MQTT连接的MQData;完整的代码已经放在网盘了,为了让大家先明白原理,我把关键地方的代码贴在这里://赋值结构体变量MQDatauserData=(MQData*)malloc(sizeof(MQData));//这里的设置属于连接参数:id,name等//创建mosquitto对象时,传入userData。structmosquitto*mosq=mosquitto_new(userData->id,true,userData);//回调函数中,将obj指针转发给MQData指针staticvoidmessageCB(structmosquitto*mosq,void*obj,conststructmosquitto_message*message){MQData*userData=(MQData*)obj;//此时可以根据userData指针中的内容判断出这是哪个链接}另一个问题:不知大家注意到例子中的mosquitto_loop()函数了吗?这个函数需要放在mosuiqtto的内部事件中,只能在while死循环中不断调用才能触发。(其实在mosuiqtto中,提供了另一个简化的函数mosquitto_loop_forever)。也就是说:在每一个连接中,都需要不断的触发mosquitto的底层事件,这样消息系统才能顺利的收发。因此,示例代码中使用了两个线程分别连接云平台的总线和内部总线。4.总结这两篇文章我们基本上讲完了物联网系统网关中最基本的通信模型,相当于一个程序的骨架。剩下的就是处理业务层的细节了。万里长征,这是第一步!对于一个网关来说,需要处理的问题比较多,比如:MQTT连接的认证(用户名+密码,证书),通讯数据的序列化和反序列化加解密等等,以后慢慢说吧,希望大家可以前进!
