当前位置: 首页 > Linux

TDengine在黑格智造的应用

时间:2023-04-06 01:31:22 Linux

【作者简介】:大罗,黑格智造架构师,主要从事云原生和大数据系统开发,曾参与国家电网建设。示范工业互联网系统。对于工业互联网或者物联网系统,最基本的需求就是展示数据曲线,比如功率曲线,类似于股票的分时图。通常,我们会把设备每分钟上报的最后一次电量作为这一分钟的电量,如果设备在某一分钟内没有上报,则取上一分钟的电量值,以此类推。示例如下:获取的分钟曲线:通常我们会先将设备上报的数据写入ApacheKafka。如果是离线计算场景,可以考虑将数据写入Hive,然后使用SparkSQL定时读取Hive,再将计算结果写入HBase;如果是实时计算场景,会使用ApacheFlink消费Kafka数据,并将结果写入HBase,此时需要考虑数据乱序、计算延迟投递等问题。而且基于传统的大数据Hadoop架构,需要搭建ZooKeeper和HDFS,其次是Hive和HBase,整个系统的维护成本非常高。另外,HBase按照key-value存储时序数据,在相同key-value的数据设计架构上浪费了很多空间。以上是物联网设备属性曲线计算场景中的痛点之一。此外,还需要考虑数据增长、数据校验、数据容灾等问题。为了给客户提供基于3D打印技术的一体化解决方案,笔者所在的公司自然需要持续跟踪设备的运行状态,存储设备的运行数据。这时,我们找到了开源物联网大数据平台TDengine(https://github.com/taosdata/TDengine)。参考TDengine文档中的SQL写法。当数据完整后,用一句SQL就可以轻松解决上面的问题:selectlast(val)afromsuper_table_xxwherets>='2021-06-0718:10:00'andts<='2021-06-0718:20:00'间隔(60秒)填充(值,0);为什么类似的SQL,TDengine的执行效率可以这么高?这在于它的超表和子表。针对单个设备的数据,TDengine设计了按时间连续存储的特性。事实上,业务系统在使用物联网数据时,无论是即时查询还是离线分析,都具有连续一段时间读取单个设备数据的特点。假设我们要存储设备的温度和湿度,我们可以这样设计超级表:createstableifnotexistss_device(tsTIMESTAMP,temperaturedouble,humiditydouble)TAGS(device_snBINARY(1000));实际使用中,以设备为例插入'd1'和'd2'数据的SQL如下:insertintos_device_d1(ts,temperature,humidity)USINGs_device(device_sn)TAGS('d1')values(1623157875000,35.34,80.24);插入s_device_d2(ts,temperature,humidity)USINGs_device(device_sn)TAGS('d2')values(1623157891000,29.63,79.48);查询设备'd1'某个时间段的数据,SQL如下:select*froms_devicewheredevice_sn='d1'andts>1623157871000andts<1623157890000;假设统计过去7天的平均温度曲线,每小时1点:selectavg(temperature)temperaturefroms_devicewheredevice_sn=#{deviceSn}andts>=#{startTime}andts<#{endTime}interval(1h)TDengine也提供了很多聚合函数,类似于上面用于计算1分钟连续曲线的last和fill,以及其他常用的sum和max。在结合应用的过程中,我们选择了MyBatis这个灵活易用的ORM框架。比如上面的数据表's_device',我们先定义实体:importlombok.Data;importlombok.NoArgsConstructor;importjava.sql.Timestamp;/***@author:DaLuo*@date:2021/06/25*@description:*/@Data@AllArgsConstructor@NoArgsConstructor@Builder@TableName(value="s_device")publicclassTestSuperDeviceEntity{privateTimestampts;私人浮动温度;私人浮动湿度;@TableField(value="device_sn")privateStringdevice_sn;}重新定义映射器:导入lombok.Builder;导入lombok.Data;导入lombok.NoArgsConstructor;导入org.apache.ibatis.annotations.Insert;导入org.apache.ibatis.annotations.Mapper;importorg.apache.ibatis.annotations.Param;importorg.apache.ibatis.annotations.Select;importjava.sql.Timestamp;importjava.util.List;/***@author:DaLuo*@date:2021/06/25*@description:*/@MapperpublicinterfaceTestSuperDeviceMapperextendsBaseMapper{/***单个插入*@paramentity*@return*/@Insert({"INSERTINTO's_device_${entity.deviceSn}'(ts,温度,湿度)”,“使用s_device(device_sn)标签(#{entity.deviceSn})”,“VALUES(#{entity.ts},#{entity.temperature},#{entity.humidity})"})intinsertOne(@Param(value="entity")TestSuperDeviceEntityentity);/***批量插入*@paramentities*@return*/@Insert({""})intbatchInsert(@Param("list")Listentities);/***查询过去一段时间内的平均温度,每小时1个数据点*@paramdeviceSn*@paramstartTimeinclusive*@paramendTimeexclusive*@return*/@Select("selectavg(temperature)temperaturefroms_devicewheredevice_sn=#{deviceSn}andts>=#{startTime}andts<#{endTime}interval(1h)")ListselectSevenDaysTemperature(@Param(value="deviceSn")StringdeviceSn,@Param(value="startTime")longstartTime,@Param(value="endTime")longendTime);@AllArgsConstructor@NoArgsConstructor@Data@Builder类TempSevenDaysTemperature{私有时间戳ts;私有浮动温度;}}TDengine有一个非常巧妙的设计,就是不需要提前创建子表,所以我们可以很方便的使用'tag'标签作为子表名的一部分,插入数据的同时创建子表注意:考虑到国际化我们所有的时间存储查询交互都使用时间戳而不是“yyyy-mm-ddhh:MM:ss”格式,因为数据存储涉及到应用时区、连接字符串时区和TDengine服务时区,使用“yyyy-mm-ddhh:MM:ss”格式很容易导致时间存储不准确,而使用时间戳和长整型数据格式可以完美避免此类问题。目前Java使用TDengineJDBC-driver有两种方式:JDBC-JNI和JDBC-RESTful。前者在写入性能上更有优势。但需要在应用运行的服务器上安装TDengine客户端驱动。我们的应用使用了Kubernetes集群,程序运行在Docker中。为此,我们创建了适合我们应用程序的图像。例如基础镜像的Dockerfile如下:FROMopenjdk:8-jdk-oraclelinux7COPYTDengine-client-2.0.16.0-Linux-x64.tar.gz/RUNtar-xzvf/TDengine-client-2.0.16.0-Linux-x64.tar.gz&&cd/TDengine-client-2.0.16.0&&pwd&&ls&&./install_client.shbuild:dockerbuild-ttdengine-openjdk-8-runtime:2.0.16.0-fDockerfile。参考Dockerfile中的程序镜像:FROMtdengine-openjdk-8-runtime:2.0.16.0ENVJAVA_OPTS="-Duser.timezone=Asia/Shanghai-Djava.security.egd=file:/dev/./urandom"COPYapp.jar/app.jarENTRYPOINT["java","-jar","/app.jar"]这样我们的应用就可以被调度到任意一个K8s节点上了。另外,我们的程序涉及任务自动化调度,需要与设备下位机进行频繁的MQTT数据交互。例如云端发送命令1000-“启动任务A”,下位机回复命令2000-“接收任务A”,Instructions理解为设备,指令序号和内容为理解为它的属性。自然,这种数据也非常适合存储在TDengine时序数据库中:****************************1.row*****************************ts:2021-06-2316:10:30.000msg:{"task_id":"7b40ed4edc1149f1837179c77d8c3c1f""action":"start"}device_sn:deviceAkind:1000***************************2.行*****************************ts:2021-06-2316:10:31.000msg:{"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"received"}device_sn:deviceAkind:2000我们在与云端设备对接的过程中,经常需要考虑消息是否发送,急需保存命令,以便在云端使用application在程序中新建线程订阅指令集消息,批量写入TDengine数据库。最后,TDengine还有一个超表log.dn,里面保留了内存、CPU等使用信息,所以我们可以使用Grafana来展示这些数据,为监控提供可靠的运行数据参考!