当前位置: 首页 > 科技观察

十分钟入门FinkSQL

时间:2023-03-16 00:27:38 科技观察

前言Flink本身是批流的统一处理框架,所以TableAPI和SQL都是统一批流的上层处理API。目前功能还不完善,处于积极开发阶段。TableAPI是一组嵌入在Java和Scala语言中的查询API,它允许我们以非常直观的方式从一些关系运算符(例如select、filter和join)中组合查询。对于FlinkSQL,可以直接在代码中写SQL来实现一些查询操作。Flink的SQL支持是基于实现SQL标准的ApacheCalcite(Apache的开源SQL解析工具)。图1.导入需要的依赖包org.apache.flinkflink-table-planner_2.121.10.1/dependency>org.apache.flinkflink-table-api-scala-bridge_2.121.10.1org.apache.flinkflink-csv1.10.1flink-table-planner:planner规划器,是最重要的部分表API提供运行环境和生成程序执行计划的计划器;flink-table-api-scala-bridge:bridge桥接器,主要负责tableAPI和DataStream/DataSetAPI之间的连接支持,按语言java和scala划分。需要添加这里的两个依赖才能在IDE环境下运行;如果是生产环境,lib目录默认已经有planner,只需要有bridge即可。当然,如果你想使用自定义函数或者与Kafka连接,你需要一个SQL客户端,flink-table-common中包含了这个客户端。2.两个planner的区别(old&blink)批流统一:Blink把批处理作业当做流处理的一个特例。因此,blink不支持表和数据集之间的转换。Batch作业不会转换为DataSet应用程序,而是像流处理一样转换为DataStream程序进行处理。因为batchflow是统一的,Blinkplanner不支持BatchTableSource,Blinkplannerusingbounds只支持全新的目录,不支持废弃的ExternalCatalog。旧计划器和Blink计划器的FilterableTableSource实现不兼容。旧的规划器会将PlannerExpressions下推到filterableTableSource中,而blink规划器会下推表达式。基于字符串的键值配置选项仅适用于Blinkplanner。PlannerConfig在两个规划器中的实现方式不同。Blinkplanner将优化DAG中的多个接收器(仅在TableEnvironment上支持,在StreamTableEnvironment上不支持)。而老planner的优化总是把每个sink放在一个新的DAG中,所有的DAG都是相互独立的。旧的planner不支持目录统计,但是Blinkplanner支持。3.表(Table)的概念TableEnvironment可以注册目录,也可以基于目录注册。它将维护Catalog-Table表之间的映射。表(Table)由标识符指定,由3部分组成:目录名、数据库(database)名和对象名(tablename)。如果未指定目录或数据库,则使用当前默认值。4.连接文件系统(Csv格式)连接外部系统并在Catalog中注册,直接调用tableEnv.connect()即可,其中的参数必须传入一个ConnectorDescriptor,即连接器描述符。对于文件系统的connector,flink内部已经提供了,叫做FileSystem()。5.测试用例(新增)需求:使用txt文本文件作为输入流读取数据,过滤id不等于sensor_1的数据实现思路:首先我们先搭建一个tableenv环境,通过提供的方法读取数据通过connect然后设置Tablestructure将数据注册成表过滤我们的数据(使用sql或流处理分析)准备数据sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718206,32sensor_1,1547718208,36.2sensor_1,1547718210,29.7sensor_1,1547718213,30.9代码实现importorg.apache.flink.streaming.api.scala._importorg.apache.flink.table.api.{DataTypes}importorg.apache.table.api.scala._importorg.apache.flink.table.descriptors.{Csv,FileSystem,Schema}/***@Package*@作者大数据哥*@date2020/12/1221:22*@versionV1.0*第一个Flinksql测试用例*/objectFlinkSqlTable{defmain(args:Array[String]):Unit={//构建运行流处理的运行环境valenv=StreamExecutionEnvironment.getExecutionEnvironment//构建表环境valtableEnv=StreamTableEnvironment.create(env)//通过connect读取数据tableEnv.connect(newFileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")).withFormat(newCsv())//设置类型.withSchema(newSschema()//将元数据信息添加到data.field("id",DataTypes.STRING()).field("time",DataTypes.BIGINT()).field("temperature",DataTypes.DOUBLE())).createTemporaryTable("inputTable")//创建临时表valresTable=tableEnv.from("inputTable").select("*").filter('id==="sensor_1")//使用SQL查询数据varresSql=tableEnv.sqlQuery("select*frominputTablewhereid='sensor_1'")//将数据转为流输出resTable.toAppendStream[(String,Long,Double)].print("resTable")resSql.toAppendStream[(String,Long,Double)].print("resSql")env.execute("FlinkSqlWrodCount")}}6.TableEnvironment的作用注册目录在内部目录的注册中心执行SQL查询注册用户自定义函数注册用户自定义函数保存参考ExecutionEnvironment或StreamExecutionEnvironment创建TableEnv时,可以传入一个EnvironmentSettings或TableConfig参数,可以用来配置TableEnvironment的一些特性7.老版本创建流处理批处理7.1老版本流处理valsettings=EnvironmentSettings.newInstance().useOldPlanner()//使用老版本planner.inStreamingMode()//流处理模式.build()valtableEnv=StreamTableEnvironment.create(env,settings)7.2老版本批处理valbatchEnv=ExecutionEnvironment.getExecutionEnvironmentvalbatchTableEnv=BatchTableEnvironment.create(batchEnv)7.3blink版本流处理环境valbsSettings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().buildeamTableEnviStringv=.create(env,bsSettings)7.4blink版本批处理环境valbbSettings=EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()valbbTableEnv=TableEnvironment.create(bbSettings)总结:本文主要讲解FlinkSQL入门操作,我会分享一些关于FlinkSQL连接Kafka,输出到kafka,MySQL等,请联系大数据小哥公众号重新打印这篇文章。