当前位置: 首页 > 后端技术 > Java

mica-mqtt1.0.3发布,加入websocket支持

时间:2023-04-01 14:44:53 Java

1.简介mica-mqtt是一个基于t-io的简单、低延迟、高性能的mqtt物联网开源组件。详情参见mica-mqttgitee源码mica-mqtt-example模块。2.功能[x]支持MQTTv3.1、v3.1.1和v5.0协议。[x]支持websocketmqtt子协议(支持mqtt.js)。[x]支持MQTT客户端。[x]支持MQTT服务器服务器。[x]支持MQTT遗嘱消息。[x]支持MQTT保留消息。[x]支持自定义消息(mq)处理和转发实现集群。[x]MQTT客户端阿里云mqtt连接demo。[x]支持GraalVM编译本机可执行文件。[x]支持快速访问Springboot项目(mica-mqtt-spring-boot-starter)。[x]mica-mqtt-spring-boot-starter支持对接Prometheus+Grafana。3.待办[]优化mqtt会话的处理,支持mqttv5.0的一些新特性。4.更新记录?mica-mqtt服务端增加websocketmqtt子协议支持(支持mqtt.js)。?mica-mqtt服务器ip,默认为空,可以不设置。?mica-mqtt客户端移除CountDownLatch,避免启动时未连接到服务器时卡住。?mica-mqtt客户端增加最大包长度字段,避免包长度超过8092导致解析异常。?mica-mqtt客户端增加连接监听IMqttClientConnectListener。?mica-mqtt3.1协议会校验clientId的长度,增加配置项maxClientIdLength。?mica-mqtt优化mqtt解码异常处理。?mica-mqtt日志优化,方便查询。?mica-mqtt代码优化,部分Tio.close改为Tio.remove。?mica-mqtt-spring-boot-example添加Dockerfile,支持spring-boot:build-image。?完善mica-mqtt-spring-boot-starter,增加will消息配置。??将t-io升级到3.7.4。5.Springboot快速接入5.1添加依赖net.dreamlumica-mqtt-spring-boot-starter1.0.3/dependency>5.2服务器配置示例mqtt:server:enabled:true#是否启用,默认:trueip:127.0.0.1#服务器ipdefault:127.0.0.1port:5883#端口,默认:1883name:Mica-Mqtt-Server#名称,默认:Mica-Mqtt-Serverbuffer-allocator:HEAP#堆内存和堆外内存,默认:堆内存heartbeat-timeout:120000#心跳超时,单位毫秒,默认:1000*120read-buffer-size:8092#接收数据的缓冲区大小,默认:8092max-bytes-in-message:8092#消息解析的最大字节长度,默认:8092debug:true#如果开启prometheus指标采集,建议开启offwebsocket-enable:true#开启websocket子协议,websocket-port默认开启:8083#websocket端口,默认:80835.3服务端可以实现接口(注册为SpringBean即可)。接口是否必须指明IMqttServerAuthHandler用于客户端认证。IMqttMessageListener用于消息监听。IMqttConnectStatusListener用于连接状态Supervise监听IMqttSessionManager无会话管理IMqttMessageStore集群有,单机无会和预留消息存储AbstractMqttMessageDispatcher集群有,单机无消息转发,(会,预留消息转发)IpStatListener无t-ioip状态监控5.4Server自定义配置(可选)@Configuration(proxyBeanMethods=false)publicclassMqttServerCustomizerConfiguration{@BeanpublicMqttServerCustomizeractiveRecordPluginCustomizer(){returnnewMqttServerCustomizer(){@Overridepublicvoidcustomize(MqttServerCreator可以覆盖这里的配置//creator){ConfigurationSystem.out.println("----------------MqttServerCustomizer----------------");}};}}5.5MqttServerTemplate示例.stereotype.Service;importjava.nio.ByteBuffer;/***@authorwsq*/@ServicepublicclassServerService{@AutowiredprivateMqttServerTemplate服务器;publicbooleanpublish(Stringbody){server.publishAll("/test/123",ByteBuffer.wrap(body.getBytes()));返回真;}}5.6基于mq消息广播集群处理实现IMqttConnectStatusListener处理设备状态存储实现IMqttMessageListener向mq转发消息,业务根据需要处理mq消息。实现IMqttMessageStore以存储遗嘱和预订消息。实现AbstractMqttMessageDispatcher向mq发送消息,mq广播回mqtt集群,mqtt向设备发送消息。业务消息发送到mq,mq广播到mqtt集群,mqtt发送消息到设备。5.7Prometheus+Grafana监控连接得益于t-io的良好设计。直连监控指标的t-iostat目前支持以下指标,未来会不断完善。支持得指标说明mqtt_connections_accepted共接受过连接数mqtt_connections_closed关闭过的连接数mqtt_connections_size当前连接数mqtt_messages_handled_packets已处理消息数mqtt_messages_handled_bytes已处理消息字节数mqtt_messages_received_packets已接收消息数mqtt_messages_received_bytes已处理消息字节数mqtt_messages_send_packets已发送消息数mqtt_messages_send_bytesSentmessagebytesFormoreinformationaboutmica-mqtt-spring-boot-starter,pleaserefertothedocument:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter6,Ordinaryjavaprojectaccess6.1mavendependsonnet.dreamlumica-mqtt-core1.0.36.2mica-mqttclient//InitializemqttclientMqttClientclient=MqttClient.create().ip("127.0.0.1").port(1883)//Default:1883.username("admin").password("123456").version(MqttVersion.MQTT_5)//Default:3_1_1.clientId("xxxxxx")//Default:MICA-MQTT-prefixand36nanoseconds.connect();//连接//消息订阅,方法类似subxxxclient.subQos0("/test/#",(topic,payload)->{logger.info(topic+'\t'+ByteBufferUtil.toString(payload));});//取消订阅client.unSubscribe("/test/#");//发送消息client.publish("/test/client",ByteBuffer.wrap("云母最牛皮".getBytes(StandardCharsets.UTF_8)));//断开连接client.disconnect();//重新连接client.reconnect();//停止client.stop();6.3mica-mqttserver//注意:为了接受更多的连接(低内存),请添加jvm参数-Xss129kMqttServermqttServer=MqttServer.create()//default:127.0.0.1.ip("127.0.0.1")//default:1883.port(1883)//default:8092(mqtt默认最大消息大小),为了减少内存,可以减小这个参数,如果消息太大,t-io会尝试多次解析(根据实际业务情况推荐).readBufferSize(512)//自定义authentication.authHandler((clientId,userName,password)->true)//消息监听。messageListener((clientId,topic,mqttQoS,payload)->{logger.info("clientId:{}topic:{}mqttQoS:{}message:{}",clientId,topic,mqttQoS,ByteBufferUtil.toString(有效载荷));})//ssl配置.useSsl("","","")//自定义客户端注销和离线监听.connectStatusListener(newIMqttConnectStatusListener(){@Overridepublicvoidonline(StringclientId){}@Overridepublicvoidoffline(StringclientId){}})//自定义消息转发,可以通过mq广播实现集群处理。messageDispatcher(newIMqttMessageDispatcher(){@Overridepublicvoidconfig(MqttServermqttServer){}@Overridepublicbooleansend(Messagemessage){returnfalse;}@Overridepublicbooleansend(StringclientId,Messagemessage){返回false;}}).debug()//启用t-io调试信息log.start();//发送给某个客户端mqttServer.publish("clientId","/test/123",ByteBuffer.wrap("云母最牛皮".getBytes()));//发送给本话题的所有在线听众ClientmqttServer.publishAll("/test/123",ByteBuffer.wrap("云母最牛皮".getBytes()));//停止mq服务ttServer.stop();7、效果演示8、关注我们,扫描上方二维码,更多精彩内容每天推荐!