当前位置: 首页 > 科技观察

快速入门FlinkSQL——Table与DataStream的转换

时间:2023-03-16 23:02:37 科技观察

本文将主要分享如何连接Kafka、MySQL作为输入流并进行计数操作,以及Table与DataStream的转换。1、使用Kafka作为输入流Kafka的连接器flink-kafka-connector,1.10版本已经提供了TableAPI支持。我们可以在connect方法中直接传入一个叫Kafka的类,就是kafka连接器的描述符ConnectorDescriptor。准备资料:1、语言数2、英文资料3、化学生活4、文学5、语义学6、学习资料创建kafka主题./kafka-topics.sh--create--zookeepernode01:2181,node02:2181,node03:2181--replication-factor2--partitions3--topicFlinkSqlTest通过命令行启动producer[root@node01bin]#./kafka-console-producer.sh--broker-listnode01:9092,node02:9092,node03:9092--topicFlinkSqlTest>1,NumberofLanguage>2,English>3,Chemical>4,Literature>5,Semantics>6,学物写Flink代码,连接kafkaimportorg.apache.flink.streaming.api。scala._importorg.apache.flink.table.api.DataTypesimportorg.apache.flink.table.api.scala._importorg.apache.flink.table.descriptors.{Csv,Kafka,Schema}/***@Package*@author大数据小弟*@date2020/12/170:35*@versionV1.0*/objectFlinkSQLSourceKafka{defmain(args:Array[String]):Unit={//获取流处理的运行环境valenv=StreamExecutionEnvironment.getExecutionEnvironment//获取表的运行环境valtableEnv=StreamTableEnvironment.create(env)tableEnv.connect(newKafka().version("0.11")//设置kafka.topic("FlinkSqlTest")的版本//设置topic为connected.property("zookeeper.connect","node01:2181,node02:2181,node03:2181")//设置zookeeper的连接地址和端口号.property("bootstrap.servers","node01:9092,node02:9092,node03:9092")//设置连接地址和端口号ofkafka).withFormat(newCsv())//设置format.withSchema(newSchema()//设置元数据信息.field("id",DataTypes.STRING()).field("name",DataTypes.STRING())).createTemporaryTable("kafkaInputTable")//创建临时表//定义要查询的sql语句valresult=tableEnv.sqlQuery("select*fromkafkaInputTable")//打印数据result.toAppendStream[(String,String)].print()//启用env.execute("sourcekafkaInputTable")的执行}}运行结果图当然也可以连接外部系统如ElasticSearch、MySql、HBase、Hive等,实现方法基本就是2.表的查询使用了外部系统Connector连接器,我们可以读写数据,在环境的目录中注册,接下来我们可以对表进行查询和改造.Flink为我们提供了两种查询方式:TableAPI和SQL。3.TableAPI调用TableAPI是Scala和Java语言集成的查询API。与SQL不同的是,TableAPI的查询不是用字符串表示,而是在宿主语言中一步步调用。TableAPI基于表示一个表的Table类,提供了一套操作处理的方法API。这些方法将返回一个新的Table对象,它表示将转换操作应用于输入表的结果。一些关系转换操作可以由多个方法调用组成,形成链式调用结构。比如table.select(…).filter(…),其中select(…)表示选择表中的指定字段,filter(…)表示过滤条件。代码中的实现如下:valkafkaInputTable=tableEnv.from("kafkaInputTable")kafkaInputTable.select("*").filter('id!=="1")4.SQL查询Flink的SQL集成基于ApacheCalcite,它实现了SQL标准。在Flink中,常规字符串用于定义SQL查询。SQL查询的结果是一个新表。代码实现如下:valresult=tableEnv.sqlQuery("select*fromkafkaInputTable")当然也可以加入聚合操作,比如我们统计每个用户的数量,调用tableAPIvalresult:Table=tableEnv。来自(“kafkaInputTable”)结果。groupBy("user").select('name,'name.countas'count)调用SQLvalresult=tableEnv.sqlQuery("selectname,count(1)ascountfromkafkaInputTablegroupbyname")在这里,表API中指定的字段前面有一个单独的quote',这是TableAPI中定义的表达式类型,可以方便的表示一个表中的字段。字段可以直接用双引号括起来,也可以用半单引号+字段名的形式。在下面的代码中,一般使用后一种形式。5、DataStream转TableFlink允许我们将Table和DataStream进行转换:基于一个DataStream,我们可以先以流的方式读取数据源,然后映射成一个sampleclass,再转成Table。Table的列字段就是示例类中的字段,不用再麻烦定义schema了。5.1.代码实现代码中的实现很简单,直接使用tableEnv.fromDataStream()即可。默认转换后的Tableschema对应DataStream中的字段定义,也可以单独指定。这允许我们改变字段的顺序,重命名它们,或者只选择某些字段,这相当于做了一个map操作(或TableAPI的select操作)。代码如下:importorg.apache.flink.streaming.api.scala._importorg.apache.flink.table.api.scala._/***@Package*@author大数据哥*@date2020/12/1721:21*@versionV1.0*/objectFlinkSqlReadFileTable{defmain(args:Array[String]):Unit={//搭建流处理运行环境valenv=StreamExecutionEnvironment.getExecutionEnvironment//搭建表运行环境valtableEnv=StreamTableEnvironment.create(env)//使用流处理读取数据valreadData=env.readTextFile("./data/word.txt")//使用flatMap拆分valword:DataStream[String]=readData.flatMap(_.split(""))//Convertwordtotablevaltable=tableEnv.fromDataStream(word)//计算wordcountvalwordCount=table.groupBy("f0").select('f0,'f0.countas'count)wordCount.printSchema()//转换为流处理打印输出tableEnv.toRetractStream[(String,Long)](wordCount).print()env.execute("FlinkSqlReadFileTable")}}5.2DataStream中数据类型对应的数据类型和Tableschema,表的schema对应关系是基于示例类中的字段名(name-basedmapping),所以也可以使用as来重命名。另一种映射方式是直接根据字段的位置进行映射(position-basedmapping)。在映射过程中,您可以直接指定一个新的字段名称。基于名称的对应关系:valuserTable=tableEnv.fromDataStream(dataStream,'usernameas'name,'idas'myid)基于位置的对应关系:valuserTable=tableEnv.fromDataStream(dataStream,'name,'id)Flink的DataStream和DataSetAPI支持多种类型.复合类型,例如元组(Scala和Java内置的元组)、POJO、Scala的case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,可以在Table表达式中访问。其他类型被认为是原子类型。对于元组类型和原子类型,一般使用位置对应比较好;如果一定要用名字对应,也可以:元组类型,默认名字是_1,_2;和原子类型,默认名称是f0。6.创建临时视图(TemporaryView)创建临时视图的第一种方法是直接从DataStream转换而来。同样可以直接对应字段转换;转换时也可以指定对应的字段。代码如下:tableEnv.createTemporaryView("sensorView",dataStream)tableEnv.createTemporaryView("sensorView",dataStream,'id,'temperature,'timestampas'ts)另外当然也可以创建基于视图onTable:tableEnv.createTemporaryView("sensorView",sensorTable)View和Table的Schema是完全一样的。其实在TableAPI中,View和Table可以认为是等价的。总结上面这篇文章,主要讲解了使用kafka作为输入流的loss处理。其实我也可以设置mysql、es、mysql等,类似的,还有tableapi和sql的区别。也解释了DataStream的转换位将Table或者Table转换成DataStream或者后面我们做数据分析的时候都非常简单。本文转载自微信公众号“大数据哥”,可通过以下二维码关注。转载本文请联系大数据小哥公众号。