01问题正文本篇将解决网友在第6、7号提到的问题,具体如下:简单来说,如何处理时间轴的两个数值计算?上面的例子是一个“+”的加法。02数据准备我们首先使用InfluxDB来解决以上问题。首先,我们需要准备数据并创建一个测试桶。在创建之前检查现有的存储桶。启动InfluxDB实例,如下:启动后,我们查看现有的bucket,如下:influxdbgit:(master)bin/$(uname-s|tr'[:upper:]''[:lower:]')/influxbucketlistIDNameRetentionShardgroupdurationOrganizationID98e86f05543f5866_monitoring168h0m0s24h0m0s56b35f89025991c8b9b9609ae3e08b97_tasks72h0m0s24h0m0s56b35f89025991c8创建名为iot的bucket,如下命令:bin/$(uname-s|tr'[:upper:]''[:lower:]')/influxsetup\--usernameiot\--password2021iotdb\--orgorg\--bucket2021iotdb\--retention1h\--tokeniot_test_token\--hosthttp://localhost:8086\--强制执行成功后会显示如下:?influxdbgit:(master)bin/$(uname-s|tr'[:upper:]''[:lower:]')/influxsetup\--用户名iot\--密码2021iotdb\--orgorg\--bucket2021iotdb\--retention1h\--tokeniot_test_token\--hosthttp://localhost:8086\--forceConfig默认已经存放在/Users/jincheng/.influxdbv2/configs中。我们erOrganizationBucketiotorg2021iotdb我们用命令查看一下:?influxdbgit:(master)bin/$(uname-s|tr'[:upper:]''[:lower:]')/influxbucketlistIDNameRetentionShardgroupdurationOrganizationIDc05283f56bf9cead2021iotdb1h0m0s1h0m0s0b1ad4c0cd4db9cae70f5bb2fdaa5dd2_monitoring168h0m0s24h0m0s0b1ad4c0cd4db9ca56241b01789c1a1b_tasks72h0m0s24h0m0s0b1ad4c0cd4db9ca插入两条时间线数据,如下:?influxdbgit:(master)bin/$(uname-s|tr'[:upper:]''[:lower:]')/influxwrite--bucket2021iotdb--precisions"m1vm=3333$(date+%s)"?influxdbgit:(master)bin/$(uname-s|tr'[:upper:]''[:lower:]')/influxwrite--bucket2021iotdb--precisions"m2vn=4444$(date+%s)"我们插入两条时间线数据,m1vm=3333,m2vn=4444,我们的要求是vm+vn03JOINquery我们来看看JOIN的函数定义:join()函数将两个或多个输入流,其值在一组公共列上相等,合并为一个输出流。Flux允许您连接两个数据流之间的任何公共列,并为交叉测量连接和跨测量数学等操作打开大门。语法:join(tables:{key1:table1,key2:table2},on:["_time","_field"],method:"inner")和我们标准数据库的JOIN语义基本一致。我们先查一下数据进行测试。我们可以使用influxCLI,如下:发现数据已经插入成功。您还可以使用fluxCLI。InlfuxDB社区提倡使用flux。我们打开一个fluxrepl。详情可以参考之前的文章No6。我用IDE打开如下:>from(bucket:"2021iotdb")|>range(start:-1h)Result:_resultError:unauthorizedaccess如图,我们在IDE中执行查询的时候,我们提示我们需要token,所以涌入查询是Whynot?默认情况下,IDE不读取配置文件。我们可以配置环境变量或者直接添加token。查询语句如下:>from(bucket:"2021iotdb",org:"org",token:"iot_test_token")|>range(start:-1h)Result:_resultTable:keys:[_start,_stop,_field,_measurement]_start:time_stop:time_field:string_measurement:string_time:time_value:float--------------------------------------------------------------------------------------------------------------------------------------------------------2021-04-06T05:36:50.079542000Z2021-04-06T06:36:50.079542000Zvmm12021-04-06T06:23:16.000000000Z3333表:键:[_start、_stop、_fieldment、_measure_start:时间_stop:time_field:string_measurement:string_time:time_value:float---------------------------------------------------------------------------------------------------------------------------------------------------------2021-04-06T05:36:50.079542000Z2021-04-06T06:36:50.079542000Z好,一切顺利,让我们看看是否计算了vm+vn?如果我们把两个时间序列m1和m2看成两个流(表),那么我们就需要对这两个表进行操作。首先想到的应该是两张表的JOIN,将两张表的数据合并成一张Wide表,然后进行列求值,如下:tab1=from(bucket:"2021iotdb",org:"org",token:"iot_test_token")|>range(start:-1h)|>filter(fn:(r)=>r._measurement=="m1")tab2=from(bucket:"2021iotdb",org:"org",token:"iot_test_token")|>range(start:-1h)|>filter(fn:(r)=>r._measurement=="m2")得到两张表后,我们进行JOIN操作。查询语句如下:join(tables:{m1:tab1,m2:tab2},on:["_time"])|>map(fn:(r)=>({_time:r._t??ime,_value:r._value_m1+r._value_m2}))上面的on表示JOIN的条件,但是我们发现tab1和tab2中的时间字段不一样,如下:所以我们需要快速插入两条数据,让时间字段一致,就可以得到结果。插入后数据如下:然后我们查询:join(tables:{m1:tab1,m2:tab2},on:["_time"])|>map(fn:(r)=>({_time:r._time,_value:r._value_m1+r._value_m2}))如上我们已经完成了查询需求哈哈,请问InfluxDB中这种查询是不是使用了JOIN方式?有更容易的方法吗?看后面的部分:)03PIOVT查询先看一下PIVOT的函数定义:pivot()函数收集表格中垂直(column-wise)存储的值,并将它们水平(row-wise)对齐成逻辑集。语法:pivot(rowKey:["_time"],columnKey:["_field"],valueColumn:"_value")其实标准数据库中也有PIVOT。在InfluxDB中,pivot可以将行转换为列,然后将两个时间序列数据值转换为一个表中的两列,这个内置还可以为用户进行内部优化。让我们看看它是如何工作的:>from(bucket:"2021iotdb",org:"org",token:"iot_test_token")|>range(start:-1h)|>pivot(rowKey:["_time"],columnKey:["_measurement","_field"],valueColumn:"_value")上面语句的执行结果如下:我们发现m1的vm和m2的vn都变成了表的某列,所以pivot完美地将两个时间序列数据合并到宽表列中。我们添加具体的过滤条件,如下:接下来,我们进行计算,如下:from(bucket:"2021iotdb",org:"org",token:"iot_test_token")|>range(start:-1h)|>filter(fn:(r)=>r._measurement=="m1"orr._measurement=="m2")|>pivot(rowKey:["_time"],columnKey:["_measurement","_field"],valueColumn:"_value")|>map(fn:(r)=>({_time:r._t??ime,_value:r.m1_vm+r.m2_vn}))OK,大家快速PIVOT是不是很方便?:)问题04最后留个问题给大家。你知道PIVOT和UNPIVOT在标准数据库中的使用场景吗?或者Flink&Spark是如何支持PIVOT的?或者你知道如何在ApacheIoTDB中处理多时间序列数据分析和排序吗?下次见。作者介绍孙金城,51CTO社区编辑,ApacheFlinkPMC成员,ApacheBeamCommitter,ApacheIoTTDBPMC成员,ALCBeijing成员,Apache神鱼导师,Apache软件基金会成员。专注于技术领域的流计算和时序数据存储。
