SQL数据类型介绍完一些基本概念,我们来认识一下FlinkSQL中的数据类型。FlinkSQL内置了很多常用的数据类型,也为用户提供了自定义数据类型的能力。总共有3个部分:原子数据类型。复合数据类型。用户定义的数据类型。一、原子数据类型1、字符串类型:CHAR,CHAR(n):定长字符串,与Java中的Char类似,n代表定长字符,取值范围为[1,2,147,483,647]。如果不指定n,则默认为1。VARCHAR、VARCHAR(n)、STRING:变长字符串,与Java中的String类似,n表示字符的最大长度,取值范围为[1,2,147,483,647]。如果未指定n,则默认为1。STRING等同于VARCHAR(2147483647)。2、二进制串类型:BINARY,BINARY(n):定长二进制串,n代表定长,取值范围为[1,2,147,483,647]。如果不指定n,则默认为1。VARBINARY、VARBINARY(n)、BYTES:变长二进制字符串,n表示字符的最大长度,取值范围为[1,2,147,483,647].如果未指定n,则默认为1。BYTES等同于VARBINARY(2147483647)。3.精确数值类型:DECIMAL,DECIMAL(p),DECIMAL(p,s),DEC,DEC(p),DEC(p,s),NUMERIC,NUMERIC(p),NUMERIC(p,s):定长有精度的数值类型,如Java中的BigDecima,p表示位数(长度),取值范围为[1,38];s表示小数点后的位数(精度),取值范围为[0,p]。如果未指定,p默认为10,s默认为0。TINYINT:从-128到127的1字节有符号整数,就像Java中的byte。SMALLINT:一个介于-32,768到32,767之间的2字节有符号整数,就像Java中的short一样。INT,INTEGER:从-2,147,483,648到2,147,483,647的4字节有符号整数,就像Java中的int一样。BIGINT:-9,223,372,036,854,775,808到9,223,372,036,854,775,807的8字节有符号整数,就像Java中的long。4.Lossyprecisionnumericaltype:FLOAT:4字节的单精度浮点值,类似于Java中的float。DOUBLE,DOUBLEPRECISION:8字节双精度浮点值,就像Java中的double一样。FLOAT和DOUBLE的区别见https://www.runoob.com/w3cnote/float-and-double-different.html。5.布尔类型:BOOLEAN。6.NULL类型:NULL。7.原始类型:RAW('class','snapshot')。只有当数据通过网络传输时才会进行序列化和反序列化操作,并且可以保留原始数据。以Java为例,class参数代表具体对应的Java类型,snapshot代表网络传输时该类型的序列化器。8、日期时间类型:DATE:由年月日组成的日期类型,无时区含义,取值范围[0000-01-01,9999-12-31]TIME,TIME(p):hour:Minutes:Seconds[.fractionalseconds]无时区意义的时间数据类型,精度可达纳秒级,取值范围为[00:00:00.000000000至23:59:59.9999999]。其中,p表示小数秒数,取值范围为[0,9]。如果不指定p,则默认为0。[.fractionalsecond],取值范围[0000-01-0100:00:00.000000000,9999-12-3123:59:59.999999999]。其中p表示小数秒数,取值范围为[0,9],若不指定p,则默认为6。ofyear-month-dayhour:minute:second[.fractionalsecond]时区,取值范围[0000-01-0100:00:00.000000000+14:59,9999-12-3123:59:59.999999999-14:59]。其中p表示小数秒数,取值范围为[0,9],若不指定p,则默认为6。时:分:秒[.fractionalsecond]时区,取值范围[0000-01-0100:00:00.000000000+14:59,9999-12-3123:59:59.999999999-14:59]。其中p表示小数秒数,取值范围[0,9],如果不指定p,默认为6。TIMESTAMP_LTZ与TIMESTAMPWITHTIMEZONE的区别在于TIMESTAMPWITHTIMEZONE的时区信息携带在数据中,例如:输入数据应该是2022-01-0100:00:00.000000000+08:00;TIMESTAMP_LTZ的时区信息不在数据中携带,是由FlinkSQL任务的全局配置决定的。我们可以通过table.local-time-zone参数设置时区。INTERVALYEARTOMONTH,INTERVALDAYTOSECOND:涉及多种类型的间隔。INTERVAL主要用于给TIMESTAMP和TIMESTAMP_LTZ加上偏移量。例如,向TIMESTAMP添加或减去天数、月数或年数。INTERVAL子句涉及的语法类型如下面的FlinkSQL案例所示。创建表sink_table(result_interval_yearTIMESTAMP(3),result_interval_year_pTIMESTAMP(3),result_interval_year_p_to_monthTIMESTAMP(3),result_interval_monthTIMESTAMP(3),result_interval_dayTIMESTAMP(3),result_interval_day_p1TIMESTAMP(3),result_interval_day_p1_to_hourTIMESTAMP(3),result_interval_to_minute_TIMESTAMP(3)),result_interval_day_p1_to_second_p2时间戳(3),result_interval_hour时间戳(3),result_interval_hour_to_minute时间戳(3),result_interval_hour_to_second时间戳(3),result_interval_minute时间戳(3),result_interval_minute_to_second_p2时间戳(3),result_interval_second时间戳(3),result_interval_second(p2)时间戳(p2)WITH('连接器'='打印');INSERTINTOsink_tableSELECT--FlinkSQL支持的所有INTERVAL子句如下,大体分为两种:`year-month`,`day-hour-second`--1.Year-moon。取值范围为[-9999-11,+9999-11],其中p为有效位数,取值范围为[1,4],默认值为2。例如取值为1000,但是p=2,会直接报错。--INTERVALYEARf1+INTERVAL'10'YEARasresult_interval_year--INTERVALYEAR(p),f1+INTERVAL'100'YEAR(3)asresult_interval_year_p--INTERVALYEAR(p)TOMONTH,f1+INTERVAL'10-03'YEAR(3)TOMONTHasresult_interval_year_p_to_month--INTERVALMONTH,f1+INTERVAL'13'MONTHasresult_interval_month--2.日-小时-秒。取值范围为[-99999923:59:59.999999999,+99999923:59:59.999999999],其中p1\p2都是有效位数,p1取值范围[1,6],默认值为2;p2范围[0,9],默认值为6--INTERVALDAY,f1+INTERVAL'10'DAYasresult_interval_day--INTERVALDAY(p1),f1+INTERVAL'100'DAY(3)asresult_interval_day_p1--INTERVALDAY(p1)TOHOUR,f1+INTERVAL'1003'DAY(3)TOHOURasresult_interval_day_p1_to_hour--INTERVALDAY(p1)TOMINUTE,f1+INTERVAL'1003:12'DAY(3)TOMINUTEasresult_interval_day_p1_to_minute--INTERVALDAY(p1)TOSECOND(p2),f1+INTERVAL'1000:00:00.004'DAYTOSECOND(3)asresult_interval_day_p1_to_second_p2--INTERVALHOUR,f1+INTERVAL'10'HOURasresult_interval_hour--INTERVALHOURTOMINUTE,f1+INTERVAL'10:03'HOURTOMINUTEasresult_interval_hour_to_minute--INTERVALHOURTOSECOND(p2),f1+INTERVAL'00:00:00.004'HOURTOSECOND(3)asresult_interval_hour_to_second--INTERVALMINUTE,f1+INTERVAL'10'MINUTEasresult_interval_minute--INTERVALMINUTET??OSECOND(p2),f1+INTERVAL'05:05.006'MINUTET??OSECOND(3)--result_interval_minute_to_second_p2--INTERVALSECOND,f1+INTERVALresval'3_seCONDSECOND(p2)),f1+INTERVAL'300'SECOND(3)asresult_interval_second_p2FROM(SELECTTO_TIMESTAMP_LTZ(1640966476500,3)asf1)2.复合数据类型数组类型:ARRAY,tARRAY数组最大长度为2,147,483,647。t表示数组中的数据类型。例子ARRAY,ARRAY,相当于INTARRAY,STRINGARRAY。地图类型:地图。Map类型与Java中的Map类型相同,key不重复。示例地图,地图。集合类型:MULTISET,tMULTISET。就像Java中的List类型一样,运行重复的数据。示例MULTISET,它等效于INTMULTISET。对象类型:ROW,ROW,ROW(n0t0,n1t1,...>,ROW(n0t0'd0',n1t1'd1',...)。就像Java中的自定义对象一样。示例:ROW(myFieldINT,myOtherFieldBOOLEAN),相当于ROW。3.用户自定义数据类型用户自定义类型是运行时用户使用Java等语言自定义的一种数据类型。但是当前数据类型确实不支持使用CREATETABLEDDL定义并且只支持作为函数的输入和输出参数。下面的例子:Step1,自定义数据类型publicclassUser{//1.基本类型,Flink可以通过反射类型信息自动获取数据类型//SQL类型和Java类型的映射见:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/types/#data-type-extractionpublicintage;公共字符串名称;//2.复杂类型,用户可以通过@DataTypeHint("DECIMAL(10,2)")注解标记该字段的数据类型public@DataTypeHint("DECIMAL(10,2)")BigDecimaltotalBalance;}第二步就是在UDF中使用这个数据类型publicclassUserScalarFunctionextendsScalarFunction{//1.自定义数据类型作为输出参数publicUsereval(longi){if(i>0&&i<=5){Useru=newUser();u.age=(int)i;u.name="name1";u.totalBalance=newBigDecimal(1.1d);返回你;}else{用户u=newUser();u.age=(int)i;u.name="name2";u.totalBalance=newBigDecimal(2.2d);返回你;}}//2.自定义数据类型作为输入参数publicStringeval(Useri){if(i.age>0&&i.age<=5){Useru=newUser();u.age=1;u.name="name1";u.totalBalance=newBigDecimal(1.1d);返回你的名字;}else{用户u=newUser();u.age=2;u.name="name2";u.totalBalance=newBigDecimal(2.2d);返回你的名字;}}}第三步在FlinkSQL中使用--1.CreateUDFCREATEFUNCTIONuser_scalar_funcAS'flink.examples.sql._12_data_type._02_user_defined.UserScalarFunction';--2.创建数据源表CREATETABLEsource_table(user_idBIGINTNOTNULLCOMMENT'user_id')WITH('connector'='datagen','rows-per-second'='1','fields.user_id.min'='1','fields.user_id.max'='10');--3.创建数据汇表CREATETABLEsink_table(result_row_1ROW
