当前位置: 首页 > 科技观察

物联网数据库IoTDB——从协议到数据

时间:2023-03-20 14:34:18 科技观察

首先祝各位读者节日快乐。在本系列的前几篇文章中,我们介绍了Iotdb的LSM以及Iot中的最佳实践。这次我们将看到如何集成mqtt和Iotdb。让我们开始吧:docker中的iotdb首先,制作一个测试环境。现在越来越喜欢docker和WSL了。除了一些硬盘、内存和CPU资源,没有缺点。。。在docker中运行,直接把打开的端口全部打开,只是测试环境,目录我就不挂了。dockerrun-d-p6667:6667-p31999:31999-p8181:8181-p5555:5555-p1883:1883apache/iotdb稍等片刻,执行dockerps查看是否成功~dockerpsCONTAINERIDIMAGECOMMANDCREATEDSTATUSPORTSNAMESad9b18f8bfft3?"iotdb/sbin/start-s..."2hoursagoUp2hours0.0.0.0:1883->1883/tcp,:::1883->1883/tcp,0.0.0.0:5555->5555/tcp,:::5555->5555/tcp,0.0.0.0:6667->6667/tcp,:::6667->6667/tcp,0.0.0.0:8181->8181/tcp,:::docker环境中的初步iotdb,我们完成了。接下来,启动mqtt服务。启动Mqtt服务,进入iotdb的dockerdockerexec-itad9b18f8bff3/bin/bash编辑配置文件viiotdb/conf/iotdb-engine.properties启动服务,根据自己的需要配置ip和端口。########################MQTTBrokerConfiguration#######################whethertoenablethemqttservice.enable_mqtt_service=false#修改成true,代表开启mqtt服务#themqttservicebindinghost.mqtt_host=0.0.0.0#ip#themqttservicebindingport.mqtt_port=1883#端口#thehandlerpoolsizeforhandingthemqttmessages.mqtt_handler_pool_size=1#themqttmessagepayloadformatter.mqtt_payload_formatter=json#数据格式#maxlengthofmqttmessageinbytemqtt_max_message_size=1048576重启服务,如果No,justrestartthedockerimage.iotdb基本操作启动服务:sbin/start-client.shroot@ad9b18f8bff3:/iotdb/sbin#./start-cli.sh--------------------StartingIoTDBCli----------------------______________________________|__||__||__`.|__\||.--.|_/||\_|||`.\||_)|||/.'`\\||||||__'._||_|\__.|_||__||_.'/_||__)||_____|'.__.'|_____||______.'|_______/version0.11.1IoTDB>loginsuccessfullyexitCLI:quitorexitstopservice:$sbin/stop-server.shsetastoragegrouptoIOTDB,命名root:IoTDB>SETSTORAGEGROUPTOroot查看IOTDB当前存储组:IoTDB>SHOWSTORAGEGROUPIoTDB>SHOWSTORAGEGROUP+------------+|storagegroup|+--------------+|root.test|+------------+Totallinenumber=1Itcosts0.127s查看系统中存在的所有时间序列:IoTDB>SHOWTIMESERIESIoTDB>showtimeseries+------------------------------+-----+------------+--------+--------+----------+----+----------+|时间序列|别名|存储组|数据类型|编码|压缩|标签|属性|+--------------------------------+-----+------------+--------+--------+------------+----+-----------+|root.test.wf01.wt01.temperature|null|root.test|FLOAT|GORILLA|SNAPPY|null|null||root.test.wf01.wt01.status|null|root.test|BOOLEAN|RLE|SNAPPY|null|null||root.test.wf01.wt01.hardware|null|root.test|TEXT|PLAIN|SNAPPY|null|null|+------------------------------+-----+------------+--------+--------+------------+----+--------+Totallinenumber=3Itcosts0.009s查看系统中存在的特定时间序列:SHOWTIMESERIESroot。test.wf01.wt01.statusIoTDB>SHOWTIMESERIESroot.test.wf01.wt01.status+------------------------+-----+--------------+--------+--------+------------+----+---------+|timeseries|alias|storagegroup|dataType|encoding|compression|tags|attributes|+----------------------------+-----+------------+--------+--------+----------+----+----------+|root.test.wf01.wt01.status|null|root.test|BOOLEAN|RLE|SNAPPY|null|null|+-----------------------+-----+------------+--------+--------+------------+----+----------+Totallinenumber=1Itcosts0.003s插入数据INSERT进入root.test.wf01.wt01(时间戳、状态、温度)值(200、假、20.71)IoTDB>INSERTINTTOroot.test.wf01.wt01(时间戳、状态、温度)值(200、假、20.71)消息:语句已成功执行。查看数据:选择*来自root.test;IoTDB>select*fromroot.test;+------------------------+------------------------------+------------------------+---------------------------+|时间|root.test.wf01.wt01.temperature|root.test.wf01.wt01.status|root.test.wf01.wt01.hardware|+--------------------+------------------------------+------------------------+------------------------------+|2021-01-20T02:00:00.000Z|21.2|真|你好|+------------------------+--------------------------------+-------------------------+------------------------------+Totallinenumber=1Itcosts0.077s查看设备:showdevicesIoTDB>showdevices+----------------+|devices|+------------------+|root.test.wf01.wt01|+--------------------+Totallinenumber=1Itcosts0.002smqtttoiotdb代码构建了一个用于存储包的实体对象agewang.datahub.iotdb;importcom.google.gson.Gson;importjava.util.List;publicclassIotdbVO{privateStringdevice;privatelongtimestamp=System.currentTimeMillis();privateListmeasurements;privateListvalues;publicStringgetDevice(){returndevice;}publicvoidsetDevice(Stringdevice){this.device=device;}publiclonggetTimestamp(){returntimestamp;}publicvoidsetTimestamp(longtimestamp){this.timestamp=timestamp;}publicListgetMeasurements(){returnmeasurements;}publicvoidsetMeasurements(Listmeasurements){this.measurements=measurements;}publicListgetValues(){returnvalues;}publicvoidsetValues(Listvalues){this.values=values;}publicStringtoJson(){Gsong=newGson();StringjsonData=g。toJson(this);returnjsonData;}@OverridepublicStringtoString(){return"IotdbVO{"+"device='"+device+'\''+",timestamp="+timestamp+",measurements="+measurements+",values="+values+'}';}}使用祖传代码模拟向iotdb传输数据,这里直接将mqtt的host和port配置到上面修改的iotdb的mqtt服务,就大功告成了packagewang.datahub.iotdb;importorg.fusesource.mqtt.client.BlockingConnection;importorg.fusesource.mqtt.client.MQTT;importorg.fusesource.mqtt.client.QoS;importjava.util.ArrayList;importjava.util.List;importjava。util.Random;publicclassEmmitToIotdb{publicstaticvoidmain(String[]args){String[]hardwares=newString[]{"a1","b1","b2","c3","d1","f5"};intcount=1000;for(inti=0;imeasurements=newArrayList<>();Listvalues=newArrayList<>();measurements.add("温度");measurements.add("状态");measurements.add("硬件");Randomr=newRandom();values.add(r.nextInt(40));values.add(r.nextBoolean());values.add(硬件[r.nextInt(hardwares.length)]);iotdbVO.setMeasurements(measurements);iotdbVO.setValues(values);emmitToIotdb(iotdbVO);}}公开icstaticvoidemmitToIotdb(IotdbVOcontent){try{MQTTmqtt=newMQTT();mqtt.setHost("127.0.0.1",1883);mqtt.setUserName("root");mqtt.setPassword("root");BlockingConnectionconnection=mqtt.blockingConnection();connection.connect();Stringpayload=content.toJson();connection.publish(content.getDevice(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);connection.disconnect();}catch(Exceptione){e.printStackTrace();}}}iotdb的执行结果相当强大和有趣。希望这篇文章对您有所帮助,也非常欢迎您与我交流。本文转载自微信公众号“奇思妙想”,可通过以下二维码关注。转载本文请联系七思妙想公众号。