1.前言经过一个多月的肝连接,mica-mqtt迎来了一个相对稳定的版本。在过去的一个多月里,已经上线了7个正式版,投稿100多篇。衷心感谢关注、star、使用和反馈的同学。2.简介mica-mqtt是一个基于t-io的简单、低延迟、高性能的mqtt物联网开源组件。详情参见mica-mqttgitee源码mica-mqtt-example模块。3.功能[x]支持MQTTv3.1、v3.1.1和v5.0协议。[x]支持websocketmqtt子协议(支持mqtt.js)。[x]支持httprestapi,详见httpapi文档。[x]支持MQTT客户端。[x]支持MQTT服务器服务器。[x]支持MQTT遗嘱消息。[x]支持MQTT保留消息。[x]支持自定义消息(mq)处理和转发实现集群。[x]MQTT客户端阿里云mqtt连接demo。[x]支持GraalVM编译本机可执行文件。[x]支持快速访问Springboot项目(mica-mqtt-spring-boot-starter)。[x]mica-mqtt-spring-boot-starter支持对接Prometheus+Grafana。4.更新记录mqtt-server优化连接关闭日志。mqtt-server优化订阅,同topicFilter订阅判断qos。mqtt-server监听器增加trycatch,避免业务问题导致掉线。mqtt-server优化了topicFilters验证。mqtt-client优化订阅reasonCodes判断。mqtt-client监听器增加trycatch,避免业务问题导致掉线。mqtt-client添加会话有效期。代码优化以减少编码问题。mqtt-server修复心跳时间问题。修复mqtt-server多个订阅同时匹配时消息重复的问题。mqtt-client优化连接处理逻辑,mqtt连接后订阅。修复MqttProperties中的潜在空指针。5.Springboot快速接入5.1添加依赖net.dreamlumica-mqtt-spring-boot-starter1.1.1/dependency>5.2服务器配置示例mqtt:server:enabled:true#是否启用,默认:trueip:127.0.0.1#服务器ipdefault:127.0.0.1port:5883#端口,默认:1883name:Mica-Mqtt-Server#名称,默认:Mica-Mqtt-Serverbuffer-allocator:HEAP#堆内存和堆外内存,默认:堆内存heartbeat-timeout:120000#心跳超时,单位毫秒,默认:1000*120read-buffer-size:8092#接收数据的缓冲区大小,默认:8092max-bytes-in-message:8092#消息解析的最大字节长度,默认:8092debug:true#如果开启prometheus指标采集,建议开启offwebsocket-enable:true#开启websocket子协议,websocket-port默认开启:8083#websocket端口,默认:80835.3服务端可以实现接口(注册为SpringBean即可)。接口是否必须指明IMqttServerAuthHandler用于客户端认证。IMqttMessageListener用于消息监听。IMqttConnectStatusListener用于连接状态Supervise监听IMqttSessionManager无会话管理IMqttMessageStore集群有,单机无会和预留消息存储AbstractMqttMessageDispatcher集群有,单机无消息转发,(会,预留消息转发)IpStatListener无t-ioip状态监控5.4Server自定义配置(可选)@Configuration(proxyBeanMethods=false)publicclassMqttServerCustomizerConfiguration{@BeanpublicMqttServerCustomizeractiveRecordPluginCustomizer(){returnnewMqttServerCustomizer(){@Overridepublicvoidcustomize(MqttServerCreator可以覆盖这里的配置//creator){ConfigurationSystem.out.println("----------------MqttServerCustomizer----------------");}};}}5.5MqttServerTemplate示例.stereotype.Service;importjava.nio.ByteBuffer;/***@authorwsq*/@ServicepublicclassServerService{@AutowiredprivateMqttServerTemplate服务器;publicbooleanpublish(Stringbody){server.publishAll("/test/123",ByteBuffer.wrap(body.getBytes()));返回真;}}5.6基于mq消息广播集群处理实现IMqttConnectStatusListener处理设备状态存储实现IMqttMessageListener向mq转发消息,业务根据需要处理mq消息。实现IMqttMessageStore以存储遗嘱和预订消息。实现AbstractMqttMessageDispatcher向mq发送消息,mq广播回mqtt集群,mqtt向设备发送消息。业务消息发送到mq,mq广播到mqtt集群,mqtt发送消息到设备。5.7Prometheus+Grafana监控连接得益于t-io的良好设计。直连监控指标的t-iostat目前支持以下指标,未来会不断完善。支持得指标说明mqtt_connections_accepted共接受过连接数mqtt_connections_closed关闭过的连接数mqtt_connections_size当前连接数mqtt_messages_handled_packets已处理消息数mqtt_messages_handled_bytes已处理消息字节数mqtt_messages_received_packets已接收消息数mqtt_messages_received_bytes已处理消息字节数mqtt_messages_send_packets已发送消息数mqtt_messages_send_bytesSentmessagebytesFormoreinformationaboutmica-mqtt-spring-boot-starter,pleaserefertothedocument:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter6,Ordinaryjavaprojectaccess6.1mavendependsonnet.dreamlumica-mqtt-core1.1.16.2mica-mqttclient//InitializemqttclientMqttClientclient=MqttClient.create().ip("127.0.0.1").port(1883)//Default:1883.username("admin").password("123456").version(MqttVersion.MQTT_5)//Default:3_1_1.clientId("xxxxxx")//Default:MICA-MQTT-prefixand36nanoseconds.connect();//连接//消息订阅,方法类似subxxxclient.subQos0("/test/#",(topic,payload)->{logger.info(topic+'\t'+ByteBufferUtil.toString(payload));});//取消订阅client.unSubscribe("/test/#");//发送消息client.publish("/test/client",ByteBuffer.wrap("云母最牛皮".getBytes(StandardCharsets.UTF_8)));//断开连接client.disconnect();//重新连接client.reconnect();//停止client.stop();6.3mica-mqttserver//注意:为了接受更多的连接(降低内存),请添加jvm参数-Xss129kMqttServermqttServer=MqttServer.create()//default:127.0.0.1。ip("127.0.0.1")//default:1883.port(1883)//default:8092(mqtt默认的最大消息大小),为了减少内存,可以减小这个参数,如果消息太大大,t-io会尝试多次解析(根据实际业务情况推荐).readBufferSize(512)//自定义authentication.authHandler((clientId,userName,password)->true)//消息监听。messageListener((clientId,topic,mqttQoS,payload)->{logger.info("clientId:{}topic:{}mqttQoS:{}message:{}",clientId,topic,mqttQoS,ByteBufferUtil.toString(payload));})//ssl配置.useSsl("","","")//自定义客户端离线和离线监控。connectStatusListener(newIMqttConnectStatusListener(){@Overridepublicvoidonline(StringclientId){}@Overridepublicvoidoffline(StringclientId){}})//对于自定义消息转发,可以使用mq广播实现集群处理。messageDispatcher(newIMqttMessageDispatcher(){@Overridepublicvoidconfig(MqttServermqttServer){}@Overridepublicbooleansend(Messagemessage){returnfalse;}@Overridepublicbooleansend(StringclientId,Messagemessage){返回false;}}).debug()//打开t-io调试信息log.start();//发送给客户端mqttServer.publish("clientId","/test/123",ByteBuffer.wrap("云母最好".getBytes()));//发送给所有在线收听此话题的客户端mqttServer.publishAll("/test/123",ByteBuffer.wrap("云母最牛皮".getBytes()));//停止服务mqttServer.stop();7、效果演示8、关注我们,扫描上方二维码,更多精彩内容每天推荐!