当前位置: 首页 > 后端技术 > Java

mica-mqtt1.0.2发布,完整的状态和示例

时间:2023-04-01 19:31:33 Java

1.绠€浠媘ica-mqtt鏄竴涓熀浜巘-io鐨勭畝鍗曘€佷綆寤惰繜銆侀珮鎬ц兘鐨刴qtt鐗╄仈缃戝紑婧愮粍浠躲€傝鎯呭弬瑙乵ica-mqttgitee婧愮爜mica-mqtt-example妯″潡銆?.鍔熻兘[x]鏀寔MQTTv3.1銆乿3.1.1鍜寁5.0鍗忚銆俒x]鏀寔MQTT瀹㈡埛绔€俒x]鏀寔MQTT鏈嶅姟鍣ㄦ湇鍔″櫒銆俒x]鏀寔MQTT閬楀槺娑堟伅銆俒x]鏀寔MQTT淇濈暀娑堟伅銆俒x]鏀寔鑷畾涔夋秷鎭紙mq锛夊鐞嗗拰杞彂瀹炵幇闆嗙兢銆俒x]MQTT瀹㈡埛绔樋閲屼簯mqtt杩炴帴demo銆俒x]鏀寔GraalVM缂栬瘧鏈満鍙墽琛屾枃浠躲€俒x]鏀寔蹇€熻闂甋pringboot椤圭洰锛坢ica-mqtt-spring-boot-starter锛夈€俒x]mica-mqtt-spring-boot-starter鏀寔瀵规帴Prometheus+Grafana銆?.鍋歔]娣诲姞websocket鏀寔锛堥鐮旀垚鍔燂級銆俒]浼樺寲mqttsession澶勭悊锛屾敮鎸乿5.04.鏇存柊璁板綍馃摑鏂囨。澧炲姞闆嗙兢澶勭悊姝ラ锛屽鍔爓ill娑堟伅锛屼繚鐣欐秷鎭娇鐢ㄥ満鏅€傗湪鍘婚櫎demo涓殑qos2鍙傛暟锛屾€ц兘鎹熷け杈冨ぇ锛岄伩鍏嶈鎿嶄綔銆傗湪閬楀槺锛屼繚鐣欐秷鎭唴閮ㄦ秷鎭浆鍙戞娊璞°€傗湪娣诲姞mica-mqtt-spring-boot-example銆傛劅璋sq锛園鍐锋湀瀹富锛塸r銆傗湪mica-mqtt-spring-boot-starter鏀寔瀹㈡埛绔闂拰鏈嶅姟绔紭鍖栥€傛劅璋sq锛園鍐锋湀瀹富锛塸r銆傗湪mica-mqtt-spring-boot-starter鏈嶅姟鍣ㄦ敮鎸佹寚鏍囨敹闆嗐€傚彲浠ヨ繛鎺rometheus+Grafana鐩戞帶銆傗湪褰搈qtt鏈嶅姟鍣ㄦ帴鍙椾竴涓繛鎺ユ椂锛岄鍏堝垽鏂clientId鏄惁杩樻湁鍏朵粬杩炴帴锛屽鏋滄湁鍒欒В缁戝苟鍏抽棴鍏朵粬杩炴帴銆傗瑔锔忓崌绾ica-auto鍒?.1.3淇ide澶氭ā鍧楀閲忕紪璇戦棶棰樸€?.Springboot蹇€熸帴鍏?.1娣诲姞渚濊禆net.dreamlumica-mqtt-spring-boot-starter1.0.2/dependency>5.2鏈嶅姟鍣▂ml閰嶇疆mqtt:server:enabled:true#鏄惁鍚敤锛岄粯璁わ細trueip:127.0.0.1#鏈嶅姟鍣╥pdefault:127.0.0.1port:5883#绔彛锛岄粯璁わ細1883name:Mica-Mqtt-Server#鍚嶇О锛岄粯璁わ細Mica-Mqtt-Serverbuffer-allocator:HEAP#鍫嗗唴瀛樺拰鍫嗗鍐呭瓨锛岄粯璁わ細鍫嗗唴瀛榟eartbeat-timeout:120000#蹇冭烦瓒呮椂锛屽崟浣嶆绉掞紝榛樿锛?000*120read-buffer-size:8092#鎺ユ敹鏁版嵁鐨勭紦鍐插尯澶у皬锛岄粯璁わ細8092max-bytes-in-message:8092#娑堟伅瑙f瀽鐨勬渶澶у瓧鑺傞暱搴︼紝榛樿锛?092debug:true#濡傛灉鍚敤prometheus鎸囨爣鏀堕泦锛屽缓璁甤lose5.3鏈嶅姟鍣ㄧ鍙疄鐜扮殑鎺ュ彛锛堟敞鍐屼负SpringBean鍗冲彲锛夋帴鍙f槸鍚﹀繀椤绘爣鏄嶪MqttServerAuthHandler鐢ㄤ簬瀹㈡埛绔璇併€侷MqttMessageListener鐢ㄤ簬娑堟伅鐩戝惉銆侷MqttConnectStatusListener鐢ㄤ簬杩炴帴鐘舵€佺洃鎺с€侷MqttSessionManager涓嶉€傜敤浜庝細璇濈鐞嗐€侷MqttMessageStorecluster鏄痽es锛宻tand-alone鏄疦o锛屼細骞朵繚鐣欐秷鎭瓨鍌ㄣ€侫bstractMqttMessageDispatcher闆嗙兢鏄紝鍗曟満涓嶈锛屾秷鎭浆鍙戯紝(浼氾紝淇濈暀娑堟伅杞彂锛塈pStatListenernot-ioip鐘舵€佺洃鍚?.4Server鑷畾涔夐厤缃紙鍙€夛級@Configuration(proxyBeanMethods=false)publicclassMqttServerCustomizerConfiguration{@BeanpublicMqttServerCustomizeractiveRecordPluginCustomizer(){returnnewMqttServer(Customizer){Overridepublicvoidcustomize(MqttServerCreatorcreator){//杩欓噷鍙互鑷畾涔塩reator锛屼細瑕嗙洊yml涓殑閰嶇疆System.out.println("----------------MqttServerCustomizer------------------");}};}}5.5MqttServerTemplate绀轰緥importnet.dreamlu.iot.mqtt.codec.MqttQoS;importnet.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.nio.ByteBuffer;/***@authorwsq*/@ServicepublicclassServerService{@鑷姩瑁呴厤绉佹湁MqttServerTemplate鏈嶅姟鍣紱publicbooleanpublish(Stringbody){server.publishAll("/test/123",ByteBuffer.wrap(body.getBytes()));杩斿洖鐪燂紱}}5.6鍩轰簬mq娑堟伅骞挎挱闆嗙兢澶勭悊瀹炵幇IMqttConnectStatusListener澶勭悊璁惧鐘舵€佸瓨鍌ㄣ€侷MqttMessageListener灏嗘秷鎭浆鍙戠粰mq锛屼笟鍔℃寜闇€澶勭悊mq娑堟伅銆傚疄鐜癐MqttMessageStore浠ュ瓨鍌ㄩ仐鍢卞拰棰勮娑堟伅銆傚疄鐜癆bstractMqttMessageDispatcher鍚憁q鍙戦€佹秷鎭紝mq骞挎挱鍥瀖qtt闆嗙兢锛宮qtt鍚戣澶囧彂閫佹秷鎭€備笟鍔℃秷鎭彂閫佸埌mq锛宮q骞挎挱鍒癿qtt闆嗙兢锛宮qtt鍙戦€佹秷鎭埌璁惧銆?.7Prometheus+Grafana鐩戞帶杩炴帴寰楃泭浜巘-io鐨勮壇濂借璁°€傜洃鎺ф寚鏍囩洿鎺ヨ繛鎺ュ埌t-iostat銆傜洰鍓嶆敮鎸佷互涓嬫寚鏍囷紝鏈潵浼氫笉鏂畬鍠勩€傛敮鎸佸緱鎸囨爣璇存槑mqtt_connections_accepted鍏辨帴鍙楄繃杩炴帴鏁癿qtt_connections_closed鍏抽棴杩囩殑杩炴帴鏁癿qtt_connections_size褰撳墠杩炴帴鏁癿qtt_messages_handled_packets宸插鐞嗘秷鎭暟mqtt_messages_handled_bytes宸插鐞嗘秷鎭瓧鑺傛暟mqtt_messages_received_packets宸叉帴鏀舵秷鎭暟mqtt_messages_received_bytes宸插鐞嗘秷鎭瓧鑺傛暟mqtt_messages_send_packets宸插彂閫佹秷鎭暟mqtt_messages_send_bytesForthenumberofsentmessagebytes,pleaserefertothedocumentformoreusagemethodssuchasmica-mqtt-spring-boot-starterclient:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter6.Ordinaryjavaprojectaccess6.1mavendependsonnet.dreamlumica-mqtt-core1.0.26.2mica-mqttclient//InitializemqttclientMqttClientclient=MqttClient.create().ip("127.0.0.1").port(1883)//Default:1883.username("admin").password("123456").version(MqttVersion.MQTT_5)//Default:3_1_1.clientId("xxxxxx")//Default:MICA-MQTT-prefixand36-nanoseconds.connect();//杩炴帴//娑堟伅璁㈤槄锛屾柟娉曠被浼約ubxxxclient.subQos0("/test/#",(topic,payload)->{logger.info(topic+'\t'+ByteBufferUtil.toString(payload));});//鍙栨秷璁㈤槄client.unSubscribe("/test/#");//鍙戦€佹秷鎭痗lient.publish("/test/client",ByteBuffer.wrap("浜戞瘝鏈€鐗涚毊".getBytes(StandardCharsets.UTF_8)));//鏂紑杩炴帴client.disconnect();//閲嶆柊杩炴帴client.reconnect();//鍋滄client.stop();6.3mica-mqttserver//娉ㄦ剰锛氫负浜嗘帴鍙楁洿澶氱殑杩炴帴锛堜綆鍐呭瓨锛夛紝璇锋坊鍔爅vm鍙傛暟-Xss129kMqttServermqttServer=MqttServer.create()//default:127.0.0.1.ip("127.0.0.1")//default:1883.port(1883)//default:8092(mqtt榛樿鏈€澶ф秷鎭ぇ灏?锛屼负浜嗗噺灏戝唴瀛橈紝鍙互鍑忓皬杩欎釜鍙傛暟锛屽鏋滄秷鎭お澶?t-io浼氬皾璇曞娆¤В鏋愶紙鏍规嵁瀹為檯涓氬姟鎯呭喌鎺ㄨ崘锛?readBufferSize(512)//鑷畾涔塧uthentication.authHandler((clientId,userName,password)->true)//娑堟伅鐩戝惉銆俶essageListener((clientId,topic,mqttQoS,payload)->{logger.info("clientId:{}topic:{}mqttQoS:{}message:{}",clientId,topic,mqttQoS,ByteBufferUtil.toString(payload));})//ssl閰嶇疆.useSsl("","","")//鑷畾涔夊鎴风娉ㄩ攢鍜岀绾跨洃鍚?connectStatusListener(newIMqttConnectStatusListener(){@Overridepublicvoidonline(StringclientId){}@Overridepublicvoidoffline(StringclientId){}})//鑷畾涔夋秷鎭浆鍙戯紝鍙互閫氳繃mq骞挎挱瀹炵幇闆嗙兢澶勭悊銆俶essageDispatcher(newIMqttMessageDispatcher(){@Overridepublicvoidconfig(MqttServermqttServer){}@Overridepublicbooleansend(Messagemessage){returnfalse;}@Overridepublicbooleansend(StringclientId,Messagemessage){杩斿洖false;}}).debug()//鍚敤t-io璋冭瘯淇℃伅log.start();//鍙戦€佺粰鏌愪釜瀹㈡埛绔痬qttServer.publish("clientId","/test/123",ByteBuffer.wrap("浜戞瘝鏈€鐗涚毊".getBytes()));//鍙戦€佺粰鏈瘽棰樼殑鎵€鏈夊湪绾垮惉浼桟lientmqttServer.publishAll("/test/123",ByteBuffer.wrap("浜戞瘝鏈€鐗涚毊".getBytes()));//鍋滄鏈嶅姟mqttServer.stop();7.鏁堟灉婕旂ず8.鍏虫敞鎴戜滑鎵弿涓婃柟浜岀淮鐮侊紝姣忓ぉ閮芥湁鏇村绮惧僵鍐呭鎺ㄨ崘锛?/p>