EXPLAINclause大家好,我是老杨。今天我们将学习FlinkSQL中的Explain、Show、Load、Set四个子句。应用场景:EXPLAIN子句实际用于查看当前SQL查询的逻辑计划和优化执行计划。SQL语法标准:EXPLAINPLANFOR实际案例:publicclassExplain_Test{publicstaticvoidmain(String[]args)throwsException{FlinkEnvflinkEnv=FlinkEnvUtils.getStreamTableEnv(args);flinkEnv.env().setParallelism(1);Stringsql="CREATETABLEsource_table(\n"+"user_idBIGINTCOMMENT'用户id',\n"+"nameSTRINGCOMMENT'用户名',\n"+"server_timestampBIGINTCOMMENT'用户访问时间戳',\n"+"proctimeASPROCTIME()\n"+")WITH(\n"+"'connector'='datagen',\n"+"'rows-per-second'='1',\n"+"'fields.name.length'='1',\n"+"'fields.user_id.min'='1',\n"+"'fields.user_id.max'='10',\nn"+"'fields.server_timestamp.min'='1',\n"+"'fields.server_timestamp.max'='100000'\n"+");\n"+"\n"+"CREATETABLEsink_table(\n"+"user_idBIGINT,\n"+"nameSTRING,\n"+"server_timestampBIGINT\n"+")WITH(\n"+"'connector'='print'\n"+");\n"+"\n"+"EXPLAINPLANFOR\n"+"INSERTINTOsink_table\n"+"selectuser_id,\n"+"name,\n"+"server_timestamp\n"+"from(\n"+"SELECT\n"+"user_id,\n"+"name,\n"+"server_timestamp,\n"+"row_number()over(partitionbyuser_idorderbyproctime)asrn\n"+"FROMsource_table\n"+")\n"+"wherern=1";/***算子{@linkorg.apache.flink.streaming.api.operators.KeyedProcessOperator}*--{@linkorg.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}*/for(StringinnerSql:sql.split(";")){TableResulttableResult=flinkEnv.streamTEnv().executeSql(innerSql);tableResult.print();}}}上描述代码执行结果如下:1.抽像语法树==抽象语法树==LogicalSink(table=[default_catalog.default_database.sink_table],fields=[user_id,name,server_timestamp])+-LogicalProject(user_id=[$0],name=[$1],server_timestamp=[$2])+-LogicalFilter(condition=[=($3,1)])+-LogicalProject(user_id=[$0],name=[$1],server_timestamp=[$2],rn=[ROW_NUMBER()OVER(PARTITIONBY$0ORDERBYPROCTIME()NULLSFIRST)])+-LogicalTableScan(table=[[default_catalog,default_database,source_table]])2。优化后的物理计划==OptimizedPhysicalPlan==Sink(table=[default_catalog.default_database.sink_table],fields=[user_id,name,server_timestamp])+-Calc(select=[user_id,name,server_timestamp])+-Deduplicate(keep=[FirstRow],key=[user_id],order=[PROCTIME])+-Exchange(distribution=[hash[user_id]])+-Calc(select=[user_id,name,server_timestamp,PROCTIME()AS$3])+-TableSourceScan(table=[[default_catalog,default_database,source_table]],fields=[user_id,name,server_timestamp])3.优化执行计划==优化执行计划==Sink(table=[default_catalog.default_database.sink_table],fields=[user_id,name,server_timestamp])+-calc(select=[user_id,name,server_timestamp])+-去重(keep=[FirstRow],key=[user_id],order=[PROCTIME])+-Exchange(distribution=[hash[user_id]])+-Calc(select=[user_id,name,server_timestamp,PROCTIME()AS$3])+-TableSourceScan(table=[[default_catalog,default_database,source_table]],fields=[user_id,name,server_timestamp])USE子句应用场景:如果熟悉MySQL的同学会很熟悉这个子句,在在MySQL中,theUSE子句通常用于剪切在FlinkSQL系统中,其作用与MySQL中的USE子句基本相同。用于切换Catalog、DataBase、使用ModuleSQL语言标准:切换目录:USECATALOGcatalog_name使用模块:USEMODULESmodule_name1[,module_name2,...]切换数据库:USEdb名称实例:StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment=StreamTableEnvironmentEnvironmentEnvironmentEntEnv);//创建目录tEnv.executeSql("CREATECATALOGcat1WITH(...)");tEnv.executeSql("SHOWCATALOGS").print();//+----------------+//|目录名称|//+----------------+//|默认目录|//|cat1|//+----------------+//更改默认目录tEnv.executeSql("USECATALOGcat1");tEnv.executeSql("SHOWDATABASES").print();//数据库为空//+----------------+//|数据库名称|//+----------------+//+------------+//创建数据库tEnv.executeSql("使用(...)"创建数据库db1);tEnv.executeSql("显示数据库").print();//+----------------+//|数据库名称|//+----------------+//|db1|//+----------------+//更改默认数据库tEnv.executeSql("USEdb1");//更改模块解析顺序并启用statusEnv.executeSql("USEMODULEShive");tEnv.executeSql("SHOWFULLMODULES").print();//+-------------+--------+//|模块名称|使用|//+------------+------+//|蜂巢|真|//|核心|false|//+------------+--------+SHOW子句应用场景:如果熟悉MySQL的同学对这个子句会非常熟悉。在MySQL中,SHOW子句常用于查询库、表、函数等。在FlinkSQL系统中,与FlinkSQL类似,支持SHOW的以下内容。SQL语法标准:SHOWCATALOGS:显示所有目录SHOWCURRENTCATALOG:显示当前目录SHOWDATABASES:显示当前目录下所有数据库SHOWCURRENTDATABASE:显示当前数据库SHOWTABLES:显示当前数据库下所有表SHOWVIEWS:显示所有视图SHOWFUNCTIONS:显示AllfunctionsSHOWMODULES:showallModules(ModuleisusedforUDFextension)实际案例:StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);//showcatalogstEnv.executeSql("SHOWCATALOGS").print();//+----------------+//|目录名称|//+--------------------+//|default_catalog|//+----------------+//显示当前目录Env.executeSql("SHOWCURRENTCATALOG").print();//+--------------------+//|当前目录名称|//+----------------------+//|default_catalog|//+--------------------+//showdatabasestEnv.executeSql("SHOWDATABASES").print();//+----------------+//|数据库名称|//+------------------+//|默认数据库|//+----------------+//显示当前数据库tEnv.executeSql("SHOWCURRENTDATABASE").print();//+--------------------+//|当前数据库名称|//+----------------------+//|default_database|//+------------------------+//创建一个tabletEnv.executeSql("CREATETABLEmy_table(...)WITH(...)");//显示tablestEnv.executeSql("SHOWTABLES").print();//+------------+//|表名|//+------------+//|my_table|//+------------+//创建一个viewtEnv.executeSql("CREATEVIEWmy_viewAS...");//showviewstEnv.executeSql("SHOWVIEWS")。打印();//+------------+//|视图名称|//+------------+//|my_view|//+------------+//显示函数stEnv.executeSql("SHOWFUNCTIONS").print();//+--------------+//|函数名|//+----------------+//|模式|//|sha256|//|...|//+------------+//创建用户定义函数tEnv.executeSql("CREATEFUNCTIONf1AS...");//显示用户定义函数stEnv.executeSql("显示用户函数ONS").print();//+------------+//|函数名|//+--------------+//|f1|//|...|//+------------+//showmodulesEnv.executeSql("SHOWMODULES").print();//+------------+//|模块名称|//+------------+//|核心|//+------------+//显示完整模块Env.executeSql("SHOWFULLMODULES").print();//+------------+-------+//|模块名称|used|//+------------+--------+//|core|true|//|hive|false|//+------------+--------+LOAD、UNLO??AD子句应用场景:我们可以使用LOAD子句加载内置或User-definedModule,UNLOAD子句卸载FlinkSQL系统内置或用户自定义的ModuleSQL语法标准:--LoadLOADMODULEmodule_name[WITH('key1'='val1','key2'='val2',...)]--UnloadUNLOADMODULEmodule_name实际案例:LOADcase:StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);//加载内置HivemoduleEnv.executeSql("LOADMODULEhiveWITH('hive-version'='3.1.2')");tEnv.executeSql("SHOWMODULES").print();//+------------+//|模块名称|//+--------------+//|核心|//|hive|//+------------+UNLOADCase:StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);//卸载唯一一个CoreModuletEnv.executeSql("UNLOADMODULEcore");tEnv.executeSql("SHOWMODULES").print();//因此在任何Moudle应用场景中都没有SET和RESET子句:SET子句可以用来修改一些FlinkSQL环境配置。RESET子句可以将所有环境配置恢复为默认配置,但只能在SQLCLI中使用,主要是为了让用户更纯粹地使用SQL,而不必使用其他方法或切换系统环境。SQL语法标准:SET(key=value)?重置(键)?实际案例:启动SQLCLI后,可以在SQLCLI中进行如下SET设置:FlinkSQL>SETtable.planner=blink;[INFO]Sessionpropertyhasbeenset.FlinkSQL>SET;table.planner=blink;FlinkSQL>RESETtable.planner;[INFO]会话属性已重置。FlinkSQL>RESET;[INFO]所有会话属性已设置为默认值。