介绍mica-mqtt是一个基于t-io的简单、低延迟、高性能的mqtt物联网开源组件。mica-mqttserver更容易融入现有服务和二次开发,降低自研物联网平台的开发成本。mica-mqtt客户端是一个简单易用的javamqtt客户端,更容易集成到自己的业务代码中。今天笔者主要介绍mica-mqtt客户端的使用。使用mica-mqtt-clientSpringbootstarter添加依赖net.dreamlumica-mqtt-client-spring-boot-starter2.0.3配置项说明mqtt:client:enabled:true#是否启用客户端,默认:false使用场景有限,如有需要请不要启用:127.0.0.1#已连接serverip,default:127.0.0.1port:1883#Port:Default:1883name:Mica-Mqtt-Client#Name,default:Mica-Mqtt-ClientclientId:000001#ClientId(很重要,一般是设备sn,不是repeatable)user-name:mica#认证用户名password:123456#认证密码timeout:5#超时时间,单位:秒,默认:5秒reconnect:true#是否重连,默认:truere-interval:5000#重连时间,默认5000毫秒version:MQTT_5#mqtt协议版本,默认:3.1.1read-buffer-size:8KB#接收数据的缓冲区大小,默认:8kmax-bytes-in-message:10MB#消息解析最大字节长度,默认值:10M缓冲区分配器:堆#堆内存和堆外内存,默认:堆内存keep-alive-secs:60#保活时间,单位:秒clean-session:true#mqttcleansession,default:trueuse-ssl:false#是否开启ssl,默认:false连接状态监听@ServicepublicclassMqttClientConnectListener{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MqttClientConnectListener.class);@Autowired私有MqttClientCreatormqttClientCreator;@EventListenerpublicvoidonConnected(MqttConnectedEventevent){logger.info("MqttConnectedEvent:{}",event);}@EventListenerpublicvoidonDisconnect(MqttDisconnectEventevent){//离线重连更新密码,适用于类似阿里云mqttclientId连接带时间戳的方法logger.info("MqttDisconnectEvent:{}",event);//断开连接时更新clientId、用户名、密码mqttClientCreator.clientId("newClient"+System.currentTimeMillis()).username("newUserName").password("newPassword");}}自定义匹配设置java(可选)@Configuration(proxyBeanMethods=false)publicclassMqttClientCustomizerConfiguration{@BeanpublicMqttClientCustomizermqttClientCustomizer(){returnnewMqttClientCustomizer(){@Overridepublicvoidcustomize(MqttClientCreatorcreator){//这里可自定义配置creator,会衣盖yml中的配置System.out.println("----------------MqttServerCustomizer----------------");}};}}订阅示例@ServicepublicclassMqttClientSubscribeListener{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MqttClientSubscribeListener.class);@MqttClientSubscribe("/test/#")publicvoidsubQos0(Stringtopic,ByteBufferpayload){logger.info("topic:{}payload:{}",topic,ByteBufferUtil.toString(payload));}@MqttClientSubscribe(value="/qos1/#",qos=MqttQoS.AT_LEAST_ONCE)publicvoidsubQos1(Stringtopic,ByteBufferpayload){logger.info("topic:{}payload:{}",topic,ByteBufferUtil.toString(payload));}}MqttClientTemplate使用示例@ServicepublicclassMainService{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MainService.class);@Autowired私有MqttClientTemplate客户端;publicbooleanpublish(){//发布消息示例client.publish("/test/client",ByteBuffer.wrap("micaisthebest".getBytes(StandardCharsets.UTF_8)));返回真;}publicbooleansub(){//订阅消息示例client.subQos0("/test/#",(topic,payload)->{logger.info(topic+'\t'+ByteBufferUtil.toString(payload));});返回真;}}共享订阅主题说明mica-mqtt客户端支持两种共享订阅方式:共享订阅:订阅前缀$queue/,多个客户端订阅$queue/topic,发布者发布到主题,只有一个客户端会收到消息组订阅:subscribe以$share//为前缀,群客户端订阅$share/group1/topic,$share/group2/topic..,发布者发布到topic,消息会发布到各个群,但是每个组中只有一个客户端会收到消息。jfinalmica-mqtt客户端(1.3.7开始支持)添加依赖net.dreamlujfinal-mica-mqtt-client2.0.3在jfinal-demo中删除slf4j-nop依赖添加slf4j-log4j12org.slf4jslf4j-log4j121.7.33version>在jfinalConfig中添加mica-mqtt客户端插件MqttClientPluginconfigPluginmqttClientPlugin=newMqttClientPlugin();mqttClientPlugin.config(mqttClientCreator->{//设置mqtt连接配置信息mqttClientCreator.clientId")//"按需配置,一样会互相踢。ip("mqtt.dreamlu.net").port(1883).connectListener(Aop.get(MqttClientConnectListener.class));});me.add(mqttClientPlugin);在jfinalConfigonStart启动完成后添加mqtt订阅@OverridepublicvoidonStart(){IMqttClientMessageListenerclientMessageListener=Aop.get(TestMqttClientMessageListener.class);MqttClientKit.subQos0("#",clientMessageListener);}使用MqttClientKit发送消息MqttClientKit.publish("mica","hello".getBytes(StandardCharsets));示例代码_8MqttClientConnectListenerpublicclassMqttClientConnectListenerimplementsIMqttClientConnectListener{@OverridepublicvoidonConnected(ChannelContextchannelContext,booleanisReconnect){if(isReconnect){System.out.println("重新连接到mqtt服务器成功...");}else{系统。.println("连接mqtt服务器成功...");}}@OverridepublicvoidonDisconnect(ChannelContextchannelContext,Throwablethrowable,Stringremark,booleanisRemove){System.out.println("mqttlinkdisconnectedremark:"+remark+"isRemove:"+isRemove);}}示例TestMqttClientMessageListener公共类TestMqttClientMessageListenerimplementsIMqttClientMessageListener{@OverridepublicvoidonMessage(Stringtopic,MqttPublishMessagemqttPublishMessage,ByteBufferbyteBuffer){System.out.println("收到消息topic:"+topic+"内容:\n"+ByteBufferUtil.toString(byteBuffer));}}其它java项目添加依赖net.dreamlumica-mqtt-core1.3.7org.t-iotio-websocket-servernet.dreamlu云母-mqtt-模型com.alibabafastjson</exclusion>使用//初始化mqtt客户端MqttClientclient=MqttClient.create().ip("127.0.0.1")//mqtt服务器ipaddress.port(1883)//Default:1883.username("admin")//Account.password("123456")//Password.version(MqttVersion.MQTT_5)//Default:3_1_1.clientId("xxxxxx")//非常重要,必须手动设置,通用设备sn号,默认:MICA-MQTT-prefixandnanosecondsinhexadecimal.bufferAllocator(ByteBufferAllocator.DIRECT)//堆内存和堆外内存,默认:heapmemory.readBufferSize(512)//消息一起解析长度,默认:8092(mqtt消息最大长度).maxBytesInMessage(1024*10)//最大包体长度,如果包体过大,需要设置该参数,默认为:10M(10*1024*1024).keepAliveSecs(120)//Default:60s.timeout(10)//超时时间,t-io配置,可以为null,为null时t-io默认为5.reconnect(true)//是否重连,default:true.reInterval(5000)//重连重试时间,reconnect为true时有效,t-iodefault:5000.willMessage(builder->{builder.topic("/test/offline").messageText("down");//将消息}).connectListener(newIMqttClientConnectListener(){@OverridepublicvoidonConnected(ChannelContextcontext,booleanisReconnect){logger.info("Linkedserversuccessful...");}@OverridepublicvoidonDisconnect(ChannelContextchannelContext,Throwablethrowable,Stringremark,booleanisRemove){logger.info("Disconnectedfromlinkedserver...");}}).properties()//mqtt5属性.connect();//异步连接,也支持同步connectSync()//消息订阅,方法类似subxxxclient.subQos0("/test/#",(topic,payload)->{logger.info(topic+'\t'+ByteBufferUtil.toString(有效负载));});//取消订阅client.unSubscribe("/test/#");//发送消息客户端。publish("/test/client",ByteBuffer.wrap("mica-mqtt牛皮".getBytes(StandardCharsets.UTF_8)));//断开连接client.disconnect();//重新连接client.reconnect();致谢mica-mqtt从一个实验项目逐渐完善,目前在gitee上已经有了800多个star的mica-mqtt的成长也离不开大家的使用和积极反馈,感谢@冷月宫主,@willianfu,@hjkJOJO、@Symous、@hongfeng11、@胡罗博、@杨浩、@一醉化千愁、@toskeyfine、@胡羊五牛等同学,谢谢大家!!!使用文档mica-mqtt快速入门mica-mqtt-client-spring-boot-starter使用文档mica-mqtt-server-spring-boot-starter使用文档jfinal-mica-mqtt-client使用文档jfinal-mica-mqtt-server使用mica-mqtt使用文档mica-mqtthttpapi文档查看mica-mqtt使用FAQ汇总mica-mqttreleaseversion