当前位置: 首页 > 科技观察

FlinkSQL知道为什么:基本的DMLSQL执行语义!

时间:2023-03-12 06:29:57 科技观察

1。DML:With子句?应用场景(支持Batch\Streaming):With语句和离线HiveSQL一样With语句,xdm,语法糖+1,使用它可以让你的代码逻辑更清晰。直接上案例:--语法糖+1WITHorders_with_totalAS(SELECTorder_id,price+taxAStotalFROMOrders)SELECTorder_id,SUM(total)FROMorders_with_totalGROUPBYorder_id;2.DML:SELECT&WHERE子句?INSERTINTOtarget_tableSELECT*FROMOrdersINSERTINTOtarget_tableSELECTorder_id,price+taxFROMOrdersINSERTINTOtarget_table--自定义源数据SELECTorder_id,priceFROM(VALUES(1,2.0),(2,3.1))ASt(order_id,price)INSERTINTOtarget_tableSELECTprice+taxFROMOrdersWHEREid=10--使用UDF进行字段标准化INSERTINTOtarget_tableSELECTPRETTY_PRINT(order_id)FROMOrders--过滤条件Whereid>3SQL语义:其实最好理解task是怎么产生的一条SQL执行结束的唯一方法是理解它的语义。下面以下面的SQL为例,介绍一下它的离线执行和实时执行的区别,并对比研究一下,这样大家就更加清楚了。INSERTINTOtarget_tableSELECTPRETTY_PRINT(order_id)FROMOrdersWhereid>3对于SQL对应的实时任务,假设Orders是Kafka,target_table也是Kafka,执行时会产生三个算子:数据源算子(FromOrder):连接到Kafka主题,数据源算子一直在运行,实时从OrderKafka中读取数据,一条一条发送给下游的过滤和字段标准化算子。过滤和字段标准化算子(Whereid>3andPRETTY_PRINT(order_id)):从上游算子一个接一个地接收数据,然后判断id>3?对判断结果为真的数据执行PRETTY_PRINTUDF后,将计算结果数据一一发送给下游数据汇算子。数据采集??算子(INSERTINTOtarget_table):从上游一个接一个地接收数据,写入target_tableKafka。可以看出,这个实时任务的所有算子都是以流水线的方式运行的,所有算子同时处于运行状态,24小时不间断运行,离线任务没有通用的分区概念在实时任务中。select&where关于如何查看一条FlinkSQL的最终执行计划:最好的办法就是上图,看一下Flinkwebui的算子图,算子图上的详细标注很清楚每个是什么运营商做的。从上图中我们可以看出主要有3个算子:Source算子:Source:TableSourceScan(table=[[default_catalog,default_database,Orders]],fields=[order_id,name])->Calc(select=[order_id,name,CAST(CURRENT_TIMESTAMP())ASrow_time])->WatermarkAssigner(rowtime=[row_time],watermark=[(row_time-5000:INTERVALSECOND)]),其中源表名称是table=[[default_catalog,default_database,Orders],字段是select=[order_id,name,CAST(CURRENT_TIMESTAMP())ASrow_time],Watermark策略是rowtime=[row_time],watermark=[(row_time-5000:INTERVALSECOND)]。过滤算子:calc(select=[order_id,name,row_time],where=[(order_id>3)])->NotNullEnforcer(fields=[order_id]),其中过滤条件为where=[(order_id>3)],结果字段为select=[order_id,name,row_time]Sinkoperator:Sink:Sink(table=[default_catalog.default_database.target_table],fields=[order_id,name,row_time]),其中最终输出的表名为table=[default_catalog.default_database.target_table],表字段为fields=[order_id,name,row_time]。可以看到,FlinkSQL执行的具体操作在算子图上标注的非常详细。所以一定要学会读算子图,这是掌握调试和调优之前最基本的技能。那么这条SQL如果在Hive中执行的话,假设Orders是一张Hive表,target_table也是一张Hive表,也会生成三个类似的算子(虽然实际上可能优化成一个算子,这里为了方便对比,分一分为三介绍),离线任务和实时任务的执行方式完全不同:数据源算子(FromOrder):数据源是一次性从OrderHive表中读取(通常是读取一天或一天??的分区数据hour)读取所有数据,然后将读取到的所有数据发送给下游的filter字段标准化算子,数据源算子运行结束释放资源。过滤和字段标准化算子(Whereid>3andPRETTY_PRINT(order_id)):收到上游算子的所有数据后,再遍历所有数据判断id>3?对判断结果为true的数据执行PRETTY_PRINTUDF后,将所有数据发送给下游的数据聚合算子,完成过滤和字段标准化算子,释放资源。数据采集??算子(INSERTINTOtarget_table):接收到所有上游数据后,将所有数据写入到target_tableHive表中,然后整个任务执行完毕,释放整个任务的资源。可以看出离线任务的算子是分阶段运行的。每个阶段运行结束后,下一阶段开始运行。所有阶段运行后,离线任务结束。注:很多朋友之前做过离线数仓,对计算任务的离线分区和定时调度这两个概念比较熟悉,所以刚开始接触FlinkSQL的时候,以为FlinkSQL实时任务也会有这两个概念。一个概念,这里博主解释一下。分区概念:由于容量限制,批量数据计算通常是离线进行的。每批数据的数据量是一个有限集合。这批数据自然的划分方式就是时间,比如按小时和天划分分区。但是在实时任务中,没有分区的概念,实时任务的上下游都是无穷无尽的数据流。计算任务定时调度的概念:同上,离线是由于计算能力的限制,数据需要逐批计算,逐批输入输出,所以必须按照小时和时间来调度计算天。但是,在实时任务中,并没有定时调度的概念。实时任务一旦运行起来,就会24小时不间断地运行,不间断地处理无限量的上游数据,而不是简单地向下游输出数据。3、DML:SELECTDISTINCT子句应用场景(支持Batch\Streaming):语句同离线HiveSQLSELECTDISTINCT语句,xdm用于基于key去重数据。直接上案例:INSERTintotarget_tableSELECTDISTINCTidFROMOrdersSQL语义:同样是离线和实时的对比。这条SQL对应的实时任务假设Orders是Kafka,target_table也是Kafka。执行过程中,会产生三个算子:数据源算子(FromOrder):连接到Kafka主题,数据源算子一直在运行,实时从OrderKafka中读取数据,一条一条发送到下游重复数据删除运算符。去重算子(DISTINCTid):接收上游算子发来的一条数据,然后判断id之前是否来过。判断方式是使用Flink中的state状态。如果这个id在state中已经存在,说明你已经来过,不会发给下游operator。如果status中id不存在,说明你没去过,会发给下游运营商,一一发给下游运营商。数据采集??算子数据采集算子(INSERTINTOtarget_table):从上游接收一条数据,写入到Kafka的target_table中。selectdistinct注意:对于实时任务,计算的状态可能会无限增长。状态大小取决于不同键的数量(上述情况下的id字段)。为了防止状态无限增长,我们可以设置状态的TTL。但这可能会影响查询结果的正确性。比如某个key的数据过期了,从state中删除了,那么下次再找到这样的key时,会因为在state中找不到而重新输出。那么这条SQL如果在Hive中执行,假设Orders是一张Hive表,target_table也是一张Hive表,同样会生成三个相同的算子(虽然可能优化成一个算子,这里为了方便对比,是分为三种介绍),但与实时任务的执行方式完全不同:数据源算子(FromOrder):数据源从OrderHive表中读取所有数据(通常有day和hour分区限制)一次数据,然后将读取到的数据全部发送给下游的去重算子,数据源算子运行结束释放资源。去重算子(DISTINCTid):接收到上游算子的所有数据后,遍历所有数据进行去重,将所有去重结果数据发送给下游数据宿算子,再由去重算子运行结束是的,资源被释放。数据采集??算子(INSERTINTOtarget_table):接收所有上游数据,将所有数据写入target_tableHive,然后整个任务执行完毕,释放整个任务的资源。