当前位置: 首页 > 科技观察

FlinkSQL知道为什么:SQL的时间语义!

时间:2023-03-12 00:42:21 科技观察

SQL的时间语义大家好,我是老杨,今天按照老杨的思路学习FlinkSQL的时间语义:和离线处理中常见的时间分区字段一样,时间属性在实际中也是一个核心概念-时间处理。Flink支持三种时间语义:处理时间、事件时间和摄取时间。下面将介绍三种时间语义的应用场景和案例。三种时间在生产环境中的使用频率Eventtime(常用SQL)>Processingtime(SQL用的差不多,DataStream很少用)>Ingestiontime(没用)1.三种时间简介Flink时间的属性事件时间:指的是数据本身携带的时间就是事件产生的时间,FlinkSQL在触发计算的时候也会用到数据本身携带的时间。这称为事件时间。目前在生产环境中使用最多。Processingtime:指特定算子计算数据执行时的机器时间(例如Java在算子中取System.currentTimeMillis()),在生产环境中使用最多。Ingestiontime:指数据从数据源进入Flink的时间。摄入时间是用的最少的,可以说基本不用。小伙伴们请注意:以上三个时间概念并不是因为数据而诞生的,而是有了Flink之后根据实际应用场景而诞生的。以事件时间为例。如果数据只携带时间,Flink也会消费这个数据。但是这个时间的数据在Flink中并没有作为计算的触发器,这个Flink任务不能称为事件时间任务。其次需要注意的是,一般一个Flink任务只会有一个时间属性,所以时间属性通常被认为是一个任务粒度。示例:我们可以说任务A是具有事件时间语义的任务,而任务B是处理时间语义的任务。当然,一个任务也可以有多个时间属性。2、Flink的三个时间属性的应用场景此时xdm会问,博主写的三个时间属性对我们的任务有什么影响?三种时间属性的应用场景有哪些?先说结论,时间在Flink中的作用:主要体现在包括时间窗口在内的计算上:用来标识任务的时间进度,决定是否触发窗口的计算。比如常用的滚动窗、滑动窗等,都需要时间来触发。后面会详细介绍这些窗口的应用场景。主要体现在自定义时间语义的计算上:比如用户可以每10s自定义本地时间,或者消费数据的时间戳每增加10s,输出一次计算结果,timeisin这类应用还有标记任务进度的功能。博主以滚动窗口的聚合任务为例,介绍事件时间和处理时间的对比和区别。1.事件时间案例:还是以之前的点击表为例。上述tumblewindow案例的窗口大小为1小时。需求方需要根据用户点击的时间戳cTime对数据进行划分(划分滚动窗口),然后计算count聚合结果(这样计算才能反映事件的真实发生时间),然后需要设置cTime为窗口的划分时间戳,即代码中的tumble(cTime,interval'1'hour)。这称为事件时间。即数据中的时间戳用于划分窗口(点击操作发生的实际时间)。后续的FlinkSQL任务在运行过程中也会实际以cTime的当前时间作为1小时窗口结束的触发条件,计算1小时窗口内的数据。2.处理时间案例:还是以之前的clicks表为例。还是上面的情况,但是这次需求方不需要根据数据上的时间戳来划分数据(划分滚动窗口),只需要用Flink机器上的时间作为调用条件数据到达后的一小时窗口结束并计算它。那么这个触发机制就是处理时间。3.Ingestiontime案例:Flink从外部数据源读取数据时,会将当前数据源operator的本地时间戳附加到该数据上。下游可以使用此时间戳进行窗口聚合,但很少使用。3.SQL中指定时间属性的两种方式如果要满足FlinkSQL时间窗口类的聚合操作,SQL或者TableAPI中的数据源表需要提供时间属性(相当于在数据源上设置这个时间属性table)statement),支持时间相关的操作。那么我们来看看FlinkSQL为我们提供的两种指定时间戳的方式:CREATETABLEDDL在创建表时指定。可以在DataStream中指定,在后续的DataStream-to-Table中使用。一旦定义了时间属性,它就可以像普通列一样使用,也可以用于与时间相关的操作。4.SQLeventtimecase下面我们看看如何在Flink中指定eventtime。1.CREATETABLEDDL指定如何使用时间戳。CREATETABLEuser_actions(user_nameSTRING,dataSTRING,user_action_timeTIMESTAMP(3),--用下面这句声明user_action_time为事件时间,声明watermark的生成规则,即user_action_time减5秒--事件的字段类型时间列必须是TIMESTAMP或TIMESTAMP_LTZtypeWATERMARKFORuser_action_timeASuser_action_time-INTERVAL'5'SECOND)WITH(...);SELECTTUMBLE_START(user_action_time,INTERVAL'10'MINUTE),COUNT(DISTINCTuser_name)FROMuser_actions--然后你可以在窗口操作符中使用user_action_timeGROUPBYTUMBLE(user_action_time,INTERVAL'10'MINUTE);从上面的语句可以看出,如果要使用事件时间,那么我们的时间戳类型必须是TIMESTAMP或者TIMESTAMP_LTZ类型。很多朋友会认为我们的时间戳一般都不是秒或者毫秒(BIGINT类型),那么遇到这种情况怎么办呢?必须有一个解决方案。如下。CREATETABLEuser_actions(user_nameSTRING,dataSTRING,--1.这个ts是普通的毫秒级时间戳tsBIGINT,--2.将毫秒时间戳转为TIMESTAMP_LTZ类型time_ltzASTO_TIMESTAMP_LTZ(ts,3),--3。使用下面这句声明user_action_time为事件时间,并声明水印生成规则,即user_action_time负5秒——事件时间列字段类型必须为TIMESTAMP或TIMESTAMP_LTZ类型WATERMARKFORtime_ltzAStime_ltz-INTERVAL'5'SECOND)和(...);SELECTTUMBLE_START(time_ltz,INTERVAL'10'MINUTE),COUNT(DISTINCTuser_name)FROMuser_actionsGROUPBYTUMBLE(time_ltz,INTERVAL'10'MINUTE);2.在DataStream中指定事件时间。之前介绍过Table和DataStream是可以相互转换的,所以Flink也提供了Table转换为DataStream时指定timestamp字段的能力。如下例:publicclassDataStreamSourceEventTimeTest{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());EnvironmentSettings设置=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env,settings);//1.分配水印DataStreamr=env.addSource(newUserDefinedSource()).assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor(Time.minutes(0L)){@OverridepubliclongextractTimestamp(Rowelement){return(长)element.getField("f2");}});//2.使用f2.rowtime的方式将f2字符串指示为事件时间间隔TablesourceTable=tEnv.fromDataStream(r,"f0,f1,f2.rowtime");tEnv.createTemporaryView("source_table",sourceTable);//3.在滚动窗口中使用f2StringtumbleWindowSql="SELECTTUMBLE_START(f2,INTERVAL'5'SECOND),COUNT(DISTINCTf0)\n"+"FROMsource_table\n"+"GROUPBYTUMBLE(f2,INTERVAL'5'秒)”;表resultTable=tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable,Row.class).print();环境执行();}privatestaticclassUserDefinedSourceimplementsSourceFunction,ResultTypeQueryable{privatevolatilebooleanisCancel;@Overridepublicvoidrun(SourceContextsourceContext)throwsException{inti=0;while(!this.isCancel){sourceContext.collect(Row.of("a"+i,"b",System.currentTimeMillis()));线程.睡眠(10L);我++;}}@Overridepublicvoidcancel(){this.isCancel=true;}@OverridepublicTypeInformationgetProducedType(){returnnewRowTypeInfo(TypeInformation.of(String.class),TypeInformation.of(String.class),TypeInformation.of(Long.class));}}}五、SQL处理时间案例下面看看FlinkSQL中如何指定处理时间1.CREATETABLEDDL中如何指定时间戳。CREATETABLEuser_actions(user_nameSTRING,dataSTRING,--使用下面这句话声明user_action_time为处理时间user_action_timeASPROCTIME())WITH(...);SELECTTUMBLE_START(user_action_time,INTERVAL'10'MINUTE),COUNT(DISTINCTuser_name)FROMuser_actions--然后就可以使用user_action_timeGROUPBYTUMBLE(user_action_time,INTERVAL'10'MINUTE);?DataStream来指定windowoperator中的处理时间。公共类DataStreamSourceProcessingTimeTest{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());EnvironmentSettingssettings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env,settings);//1.分配水印DataStreamr=env.addSource(newUserDefinedSource());//2.使用proctime.proctime指向f2字段作为ProcessingtimetimestampTablesourceTable=tEnv.fromDataStream(r,"f0,f1,f2,proctime.proctime");tEnv.createTemporaryView("source_table",sourceTable);//3.在翻转窗口中使用f2StringtumbleWindowSql="SELECTTUMBLE_START(proctime,INTERVAL'5'SECOND),COUNT(DISTINCTf0)\n"+"FROMsource_table\n"+"GROUPBYTUMBLE(proctime,INTERVAL'5'SECOND)";表resultTable=tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable,Row.class).print();env.execute();}privatestaticclassUserDefinedSourceimplementsSourceFunction,ResultTypeQueryable{privatevolatilebooleanisCancel;@Overridepublicvoidrun(SourceContextsourceContext)抛出异常{inti=0;while(!this.isCancel){sourceContext.collect(Row.of("a"+i,"b",System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublicvoidcancel(){this.isCancel=true;}@OverridepublicTypeInformationgetProducedType(){返回新的RowTypeInfo(TypeInformation.of(String.class),TypeInformation.of(String.class),TypeInformation.of(Long.class));}}}