当前位置: 首页 > 科技观察

FlinkSQL知道为什么:SQLDDL!

时间:2023-03-13 12:47:35 科技观察

SQL语法1.DDL:Createclause大家好,我是老杨,今天来学习一波FlinkSQL中的DDL。CREATE语句用于向当前或指定目录注册库、表、视图或函数。已注册的库、表、视图和函数可以在SQL查询中使用。目前FlinkSQL支持以下CREATE语句:CREATETABLE。创建数据库。创建视图。创建功能。本节重点介绍建表,建库、建视图、建UDF等内容会在后面的扩展章节中介绍。1.建表语句下面的SQL语句是建表语句的定义。根据指定的表名创建表。如果目录中已经存在同名表,则无法注册。创建表[如果不存在][catalog_name.][db_name.]table_name({||}[,...n][][][,...n])[COMMENTtable_comment][PARTITIONEDBY(partition_column_name1,partition_column_name2,...)]WITH(key1=val1,key2=val2,...)[LIKEsource_table[()]]:column_namecolumn_type[][COMMENTcolumn_comment]:[CONSTRAINTconstraint_name]PRIMARYKEYNOTENFORCED:[CONSTRAINTconstraint_name]PRIMARYKEY(column_name,...)NOTENFORCED:column_namecolumn_typeMETADATA[FROMmetadata_key][VIRTUAL]:column_nameAScomputed_column_expression[COMMENTcolumn_comment]<watermark_definition>:WATERMARKFORrowtime_column_nameASwatermark_strategy_expression:[catalog_name.][db_name.]table_name:{{INCLUDING|排除}{所有|约束|分区}|{包括|排除|选项|WATERMARKS}}[,...]2.表中的列常规列(即物理列)物理列就是所谓的数据库中的常规列,它定义了存储的数据中字段的名称、类型和名称按物理媒体顺序。物理列之间可以声明其他类型的列,但不会影响最终读取物理列。作为仅包含常规列的表的示例:CREATETABLEMyTable(`user_id`BIGINT,`name`STRING)WITH(...);元数据列元数据列是SQL标准的扩展,允许访问本身具有一些元数据的数据源。元数据列由METADATA关键字标识。比如我们可以使用metadata列从Kafka数据中读取Kafka数据的时间戳(这个时间戳并不是数据中的时间戳字段,但是当数据写入Kafka时,Kafka引擎会用时间戳标记数据),然后我们就可以在FlinkSQL中使用这个时间戳,比如进行基于时间的窗口操作。示例:CREATETABLEMyTable(`user_id`BIGINT,`name`STRING,--读取kafka自己的时间戳`record_time`TIMESTAMP_LTZ(3)METADATAFROM'timestamp')WITH('connector'='kafka'...);元数据列可用于后续数据处理,或写入目标表。示例:INSERTINTOMyTableSELECTuser_id,name,record_time+INTERVAL'1'SECONDFROMMyTable;如果自定义列名与Connector中定义的元数据字段名相同,则可以省略FROMxxx子句。示例:CREATETABLEMyTable(`user_id`BIGINT,`name`STRING,--读取kafka自身的时间戳`timestamp`TIMESTAMP_LTZ(3)METADATA)WITH('connector'='kafka'...);FlinkSQL的各个Connector提供的元数据字段,详见官网文档https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。如果自定义列的数据类型与Connector中定义的元数据字段的数据类型不一致,程序运行时会自动强制转换。但是这需要两种数据类型都可以强制。例如:CREATETABLEMyTable(`user_id`BIGINT,`name`STRING,--converttimestamptoBIGINT`timestamp`BIGINTMETADATA)WITH('connector'='kafka'...);默认情况下,FlinkSQL规划器认为元数据列是可读可写的。然而,一些外部存储系统的元数据信息只能用于读取而不能写入。那么在写表的场景中,我们可以使用VIRTUAL关键字来表示某个元数据列不写入外部存储(不持久化)。以Kafka为例:CREATETABLEMyTable(--下沉时会写入`timestamp`BIGINTMETADATA,--下沉时不会写入`offset`BIGINTMETADATAVIRTUAL,`user_id`BIGINT,`name`STRING,)WITH('连接器'='kafka'...);在上述情况下,Kafka引擎的偏移量是只读的。所以当我们使用MyTable作为数据源(输入)表时,schema包含偏移量。使用MyTable作为数据接收器(输出)表时,架构不包含偏移量。如下:--schemaMyTable(`timestamp`BIGINT,`offset`BIGINT,`user_id`BIGINT,`name`STRING)asdatasource(input)--schemaMyTable(`timestamp`BIGINT,`asdatasink(output)user_id`BIGINT,`name`STRING)所以这里写的时候需要注意,SQLINSERTINTO语句中不要写offset列,否则FlinkSQL任务会直接报错。ComputedcolumnComputedcolumn其实就是在写建表的DDL时,利用一些已经存在的列,经过一些自定义的操作,生成一个新的列。列本身并未物理存储在数据源中。Example:CREATETABLEMyTable(`user_id`BIGINT,`price`DOUBLE,`quantity`DOUBLE,--cost是使用价格和数量生成的计算列,计算方式为价格*数量`cost`AS价格*数量,)WITH('connector'='kafka'...);笔记!!!计算列可以包含其他列、常量或函数,但不能向其中写入子查询。萌哥们这时候就要问一个问题了。由于只能包含列、常量或函数计算,所以我直接写在DML查询代码中就搞定了。为什么我需要在DDL中定义它?结论:没有错,如果只是简单的四次算术运算,可以直接用DML写,但是计算列一般都是用来定义时间属性的(因为在SQL任务中,时间属性只能定义在DDL,不在DML语句中)。例如,标准化输入数据的时间格式。处理时间和事件时间示例如下:处理时间:使用PROCTIME()函数定义处理时间列事件时间:事件时间的时间戳可以在声明Watermark之前进行预处理。例如,如果字段不是TIMESTAMP(3)类型或时间戳嵌套在JSON字符串中,则可以使用计算列进行预处理。笔记!!!与虚拟元数据列类似,计算列只能读取不能写入。也就是说,当我们使用MyTable作为数据源(输入)表时,schema是包含cost的。使用MyTable作为数据接收器(输出)表时,架构不包括成本。示例:--schemaMyTable(`user_id`BIGINT,`price`DOUBLE,`quantity`DOUBLE,`cost`DOUBLE)作为数据源(输入)--schemaMyTable(`user_id`BIGINT,`作为数据接收器(输出)价格`DOUBLE,`quantity`DOUBLE)3.DefineWatermarkWatermark是在CreateTable中定义的。具体的SQL语法标准是WATERMARKFORrowtime_column_nameASwatermark_strategy_expression。其中:rowtime_column_name:表的事件时间属性字段。该列必须是TIMESTAMP(3)、TIMESTAMP_LTZ(3)类型,此时可以是计算列。watermark_strategy_expression:定义Watermark生成策略。Watermark一般在固定的时间间隔内从rowtime_column_name列中减去。SQL中Watermark的产生策略是:当当前Watermark大于上次发出的Watermark时发出当前Watermark。注意:如果使用事件时间语义,则必须设置事件时间属性和WATERMARK生成策略。水印发送频率:水印发送一般以一定的时间间隔进行。Watermark发送间隔可以通过pipeline.auto-watermark-interval配置。如果设置为200ms,Watermark将每200ms计算一次。但是,如果较早发布的Watermark较大,则会发布。如果间隔设置为0ms,只要满足触发条件就会发送Watermark,不受间隔时间控制。FlinkSQL提供了几种WATERMARK的生产策略:Bounded和unordered:设置方式为WATERMARKFORrowtime_columnASrowtime_column-INTERVAL'string'timeUnit。此类策略可用于设置最大乱序??时间。如果设置为WATERMARKFORrowtime_columnASrowtime_column-INTERVAL'5'SECOND,将生成一个延迟5s的Watermark。.一般采用这种水印生成策略。这种水印生成策略通常用于数据乱序的场景。在实际场景中,数据总会出现乱序,所以基本都是采用这种策略。严格升序:设置方式为WATERMARKFORrowtime_columnASrowtime_column。一般基本不用这种方法。如果你能保证你的数据源的时间戳是严格升序的,那么你可以使用这个方法。严格升序意味着Flink任务认为时间戳只会越来越大,不存在相等的情况。只要等于或小于前一个,就认为是迟到数据。增量:设置为WATERMARKFORrowtime_columnASrowtime_column-INTERVAL'0.001'SECOND。一般基本不用这种方法。如果设置了此类,则允许使用相同的时间戳。4.CreateTableWith子句先看一个案例:CREATETABLEKafkaTable(`user_id`BIGINT,`item_id`BIGINT,`behavior`STRING,`ts`TIMESTAMP(3)METADATAFROM'timestamp')WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv')可以看到DDL中的With子句描述了建表时数据源和数据宿具体对外存储的元数据信息。With中的配置项一般由FlinkSQL的Connector(链接外部存储的连接器)定义,每种Connector提供不同的With配置项。注:FlinkSQL中的Connector实际上是Flink用来链接外部数据源的接口。举一个类似的例子,如果要用Java连接MySQL,需要使用mysql-connector-java包提供的JavaAPI来连接。映射到FlinkSQL,为了在FlinkSQL中连接到Kafka,需要使用kafkaconnector。FlinkSQL已经提供了一系列内置的Connector,详见https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。回到上面的案例,With声明了如下几条信息:'connector'='kafka':声明了外部存储是Kafka。'topic'='user_behavior':声明FlinkSQL任务要连接的Kafka表的topic为user_behavior。'properties.bootstrap.servers'='localhost:9092':声明Kafka的服务器ip为localhost:9092。'properties.group.id'='testGroup':声明FlinkSQL任务消费这个Kafkatopic,会使用testGroup的groupid去消费。'scan.startup.mode'='earliest-offset':声明FlinkSQL任务消费的Kafka主题将从最早的位置开始消费。'format'='csv':声明FlinkSQL任务读入或写出时Kafka消息的序列化方式为csv格式。从这里也可以看出,With中要配置的具体配置项是由各个Connector决定的。5.CreateTableLike子句Like子句是CreateTable子句的扩展。示例:下面定义了一个Orders表:CREATETABLEOrders(`user`BIGINT,productSTRING,order_timeTIMESTAMP(3))WITH('connector'='kafka','scan.startup.mode'='earliest-offset');但是我忘记定义Watermark了,所以如果要加Watermark,可以用Like子句定义一个带Watermark的新表:CREATETABLEOrders_with_watermark(--1.AddedWATERMARKdefinitionWATERMARKFORorder_timeASorder_time-INTERVAL'5'SECOND)WITH(--2.覆盖原Orders表中的scan.startup.mode参数'scan.startup.mode'='latest-offset')--3.Like子句语句在原Orders中定义orders_with_watermark表LIKE表的基础上的订单;上述语句的效果等同于:CREATETABLEOrders_with_watermark(`user`BIGINT,productSTRING,order_timeTIMESTAMP(3),WATERMARKFORorder_timeASorder_time-INTERVAL'5'SECOND)WITH('connector'='kafka','scan.startup.mode'='latest-offset');但这并不常用。我不会介绍太多。有兴趣的朋友直接去官网参考具体注意事项:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#like.