涓€銆佷粙缁峬ica-mqtt鏄竴涓熀浜巘-io鐨勭畝鍗曘€佷綆寤惰繜銆侀珮鎬ц兘鐨刴qtt鐗╄仈缃戝紑婧愮粍浠躲€傝鎯呭弬瑙侊細mica-mqtt-example妯″潡銆?.鍔熻兘[x]鏀寔MQTTv3.1銆乿3.1.1鍜寁5.0鍗忚銆俒x]鏀寔MQTT瀹㈡埛绔€俒x]鏀寔MQTT鏈嶅姟鍣ㄦ湇鍔″櫒銆俒x]鏀寔MQTT閬楀槺娑堟伅銆俒x]鏀寔MQTT淇濈暀娑堟伅銆俒x]鏀寔鑷畾涔夋秷鎭紙mq锛夊鐞嗗拰杞彂瀹炵幇闆嗙兢銆俒x]MQTT瀹㈡埛绔樋閲屼簯mqtt杩炴帴demo銆俒x]鏀寔蹇€熻闂甋pringboot椤圭洰锛坢ica-mqtt-spring-boot-starter锛夈€?.鍋歔]娣诲姞websocket鏀寔锛堥鐮旀垚鍔燂級銆俒]浼樺寲mqttsession锛屾敮鎸乿5.04.鏇存柊璁板綍鉁ㄨ闃呯鐞嗛泦鎴愬埌session绠$悊涓€傗湪MqttProperties.MqttPropertyType娣诲姞娉ㄨВ锛岃€冭檻鍒癿qttV5.0鐨勬柊鐗规€с€傗湪澧炲姞Springbootstarter锛屾柟渚挎帴鍏ワ紝鍏煎浣庣増鏈琒pringboot銆傗湪鐮旂┒t-iowebsocket瀛愬崗璁€傪煇涗慨澶峧ava8杩愯杩囩▼涓殑涓€浜涢棶棰橈紝NoSuchMethodError:java.nio.ByteBuffer.xxx5.Springboot蹇€熻闂?.1娣诲姞渚濊禆net.dreamlumica-mqtt-spring-boot-starter${latestversion}5.2閰嶇疆椤归厤缃」榛樿鍊艰鏄巑qtt.server.nameMica-Mqtt-鏈嶅姟鍣ㄥ悕绉癿qtt.server.port1883绔彛mqtt.server銆俰p127.0.0.1serveripmqtt.server.buffer-allocatorheapmemory鍫嗗唴瀛樺拰鍫嗗鍐呭瓨mqtt.server.heartbeat-timeout120s蹇冭烦瓒呮椂鏃堕棿锛堝崟浣嶏細姣榛樿锛?000*120锛夛紝濡傛灉鐢ㄦ埛涓嶆兂frameworklevel蹇冭烦鐩稿叧宸ヤ綔锛岃灏嗘鍊艰缃负0鎴栬礋鏁癿qtt.server.read-buffer-size8092鎺ユ敹鏁版嵁缂撳啿鍖哄ぇ灏忥紝榛樿锛?092mqtt.server.max-bytes-in-message8092娑堟伅瑙f瀽鏈€澶у瓧鑺傞暱搴?default:8092mqtt.server.debugfalsedebug5.3鍙疄鐜版帴鍙o紙娉ㄥ唽涓篠pringBean鍗冲彲锛夋帴鍙f槸鍚﹀繀椤绘敞鏄嶪MqttServerAuthHandler鐢ㄤ簬瀹㈡埛绔璇両MqttMessageListener涓烘秷鎭洃鍚琁MqttConnectStatusListener涓鸿繛鎺ョ姸鎬佺洃鍚琁MqttSessionManager鏃犱細璇濈鐞咺MqttMessageStore闆嗙兢鏈夛紝stand-aloneNoReservedmessagestorageIMqttMessageDispatcherclusteryes,stand-alonenomessageforwardingIpStatListenernot-ioiptransitionmonitoring5.4鑷畾涔夐厤缃紙鍙€夛級@Configuration(proxyBeanMethods=false)ppublicclassMqttServerCustomizerConfiguration{@BeanpublicMqttServerCustomizeractiveRecordPluginCustomizer(){returnnewMqttServerCustomizer(){@Overridepublicvoidcustomize(MqttServerCreatorcreator){//杩欓噷鍙互鑷畾涔夐厤缃垱寤哄櫒锛屽畠灏嗚鐩杫mlout涓殑閰嶇疆System.鈥?---------------MqttServerCustomizer----------------");}};}}6.鏅€歫ava椤圭洰鎺ュ叆6.1maven渚濊禆net.dreamlumica-mqtt-core1.0.16.2mica-mqttclient//鍒濆鍖杕qtt瀹㈡埛绔疢qttClientclient=MqttClient.create().ip("127.0.0.1").port(1883)//榛樿锛?883.username("admin").password("123456").version(MqttVersion.MQTT_5)//榛樿鍊硷細3_1_1.clientId("xxxxxx")//榛樿鍊硷細MICA-MQTT-鍓嶇紑鍜屽崄鍏繘鍒剁撼绉掓暟.connect();//Connection//娑堟伅璁㈤槄锛屾柟娉曠被浼約ubxxxclient.subQos0("/test/#",(topic,payload)->{logger.info(topic+'\t'+ByteBufferUtil.toString(payload));});//鍙栨秷璁㈤槄client.unSubscribe("/test/#");//鍙戦€佹秷鎭痗lient.publish("/test/client",ByteBuffer.wrap("micaisthebest".getBytes(StandardCharsets.UTF_8)));//鏂紑杩炴帴client.disconnect();//閲嶆柊杩炴帴client.reconnect();//鍋滄client.stop();6.3mica-mqttserver//娉ㄦ剰锛氫负浜嗘帴鍙楁洿澶氱殑杩炴帴锛堝噺灏戝唴瀛橈級锛岃娣诲姞jvm鍙傛暟-Xss129kMqttServermqttServer=MqttServer.create()//default:127.0.0.1.ip("127.0.0.1")//default:1883.port(1883)//default:8092(mqttdefaultMaximummessagesize)锛屼负浜嗗噺灏戝唴瀛橈紝鍙互鍑忓皬杩欎釜鍙傛暟銆傚鏋滄秷鎭繃澶э紝t-io浼氬皾璇曞娆¤В鏋愶紙鏍规嵁瀹為檯涓氬姟鎯呭喌鎺ㄨ崘锛夈€?readBufferSize(512)//鑷畾涔塧uthentication.authHandler((clientId,userName,password)->true)//娑堟伅鐩戞帶銆俶essageListener((clientId,topic,mqttQoS,payload)->{logger.info("clientId:{}topic:{}mqttQoS:{}message:{}",clientId,topic,mqttQoS,ByteBufferUtil.toS鐗规灄锛堟湁鏁堣浇鑽凤級锛夛紱})//SSL閰嶇疆銆倁seSsl("","","")//鑷畾涔夊鎴风鐧诲綍鍜岄€€鍑虹洃鎺с€俢onnectStatusListener(newIMqttConnectStatusListener(){@Overridepublicvoidonline(StringclientId){}@Overridepublicvoidoffline(StringclientId){}})//鑷畾涔夋秷鎭浆鍙戯紝鍙互閫氳繃mq骞挎挱瀹炵幇闆嗙兢澶勭悊銆俶essageDispatcher(newIMqttMessageDispatcher(){@Overridepublicvoidconfig(MqttServermqttServer){}@Overridepublicbooleansend(Messagemessage){returnfalse;}@Overridepublicbooleansend(StringclientId,Messagemessage){杩斿洖false;}}).debug()//鍚敤t-io璋冭瘯淇℃伅log.start();//鍙戦€佺粰瀹㈡埛绔痬qttServer.publish("clientId","/test/123",ByteBuffer.wrap("浜戞瘝鏈€鐗涚毊".getBytes()),MqttQoS.EXACTLY_ONCE);//鍙戦€佺粰鎵€鏈夊湪绾垮惉浼楄瘽棰樺鎴风mqttServer.publishAll("/test/123",ByteBuffer.wrap("浜戞瘝鏈€鐗涚毊".getBytes()),MqttQoS.EXACTLY_ONCE);//鍋滄鏈嶅姟mqttServer.stop();7.鏁堟灉婕旂ず8.鐩稿叧鏂囨。瀹樻柟鏂囨。mqtt鍗忚鏂囨。