当前位置: 首页 > 科技观察

5分钟从零搭建你的第一个Flink应用

时间:2023-03-11 21:40:51 科技观察

本文将教你如何从零开始搭建你的第一个Flink应用。开发环境准备Flink可以运行在Linux、MaxOSX或Windows上。为了开发Flink应用程序,本地机器上需要Java8.x和maven环境。如果有Java8环境,运行以下命令会输出如下版本信息:$java-versionjavaversion"1.8.0_65"Java(TM)SERuntimeEnvironment(build1.8.0_65-b17)JavaHotSpot(TM)64-BitServerVM(build25.65-b01,mixedmode)如果有maven环境,运行如下命令会输出如下版本信息:$mvn-versionApacheMaven3.5.4(1edded0938998edf8bf061f1ceb3cfdeccf443fe;2018-06-18T02:33:14+08:00)Mavenhome:/Users/wuchong/dev/mavenJavaversion:1.8.0_65,vendor:OracleCorporation,runtime:/Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jreDefaultlocale:zh_CN,platformencoding:UTF-8OSname:"macosx",version:"10.13.6",arch:"x86_64",family:"mac"此外,我们推荐使用ItelliJIDEA(社区免费版即可)作为Flink应用的开发IDE。虽然Eclipse也可以,但是Eclipse在Scala和Java混合项目下存在一些已知问题,所以不推荐使用Eclipse。在下一章中,我们将介绍如何创建一个Flink项目并将其导入到ItelliJIDEA中。创建Maven项目我们将使用FlinkMavenArchetype来创建我们的项目结构和一些初始默认依赖项。在您的工作目录中,运行以下命令来创建项目:mvnarchetype:generate\-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java\-DarchetypeVersion=1.6.1\-DgroupId=my-flink-project\-DartifactId=my-flink-project\-Dversion=0.1\-Dpackage=myflink\-DinteractiveMode=false可以将上面的groupId,artifactId,package编辑到自己喜欢的路径下。使用以上参数,Maven会自动为你创建如下项目结构:├──BatchJob.java│└──StreamingJob.java└──resources└──log4j.properties我们的pom.xml文件中已经包含了需要的Flink依赖,src/main/java下也有几个示例程序框架。接下来我们将开始编写我们的第一个Flink程序。编写Flink程序启动IntelliJIDEA,选择“ImportProject”(导入项目),选择my-flink-project根目录下的pom.xml。根据向导完成项目导入。在src/main/java/myflink下创建SocketWindowWordCount.java文件:注意下面我们不会写import语句,因为IDE会自动添加。在本节的最后,我将展示完整的代码。如果想跳过后面的步骤,可以直接把***的完整代码粘贴到编辑器中。Flink程序的第一步是创建一个StreamExecutionEnvironment。这是一个入口类,可以用来设置参数、创建数据源和提交任务。因此,让我们将其添加到主函数中:StreamExecutionEnvironmentsee=StreamExecutionEnvironment.getExecutionEnvironment();接下来我们将创建一个数据源,它从本地端口号9000上的套接字读取数据:DataStreamtext=env.socketTextStream("localhost",9000,"\n");这将创建一个String类型的DataStream。DataStream是Fl??ink中流处理的核心API,它定义了很多常用的操作(如过滤、转换、聚合、开窗、关联等)。在此示例中,我们感兴趣的是每个单词在特定时间窗口(例如5秒窗口)中出现的次数。为此,我们首先需要将字符串数据解析成单词和次数(用Tuple2表示),第一个字段是单词,第二个字段是次数,次数的初始值设置为1.我们实现了一个平面图来进行解析,因为一行数据中可能有多个单词。数据流>wordCounts=text.flatMap(newFlatMapFunction>(){@OverridepublicvoidflatMap(Stringvalue,Collector>out){for(Stringword:value.split("\\s")){out.collect(Tuple2.of(word,1));}}});然后我们将数据流按照word字段(即index字段0)进行分组,这里可以简单的使用keyBy(intindex)方法得到一个以word为key的Tuple2数据流。然后我们可以在流上指定所需的窗口,并根据窗口中的数据计算结果。在我们的例子中,我们希望每5秒聚合一次字数,每个窗口从零开始。DataStream>windowCounts=wordCounts.keyBy(0).timeWindow(Time.seconds(5)).sum(1);第二次调用.timeWindow()指定我们想要一个5秒翻转窗口(Tumble)。第三次调用指定每个键和每个窗口的求和聚合函数。在我们的例子中,是根据次数字段(即第1个索引字段)添加的。生成的数据流将每5秒输出每个单词出现的次数。***一件事是将数据流打印到控制台并开始执行:windowCounts.print().setParallelism(1);env.execute("SocketWindowWordCount");***env.execute调用是为了启动实际的RequiredforFlink作业。所有算子操作(例如创建源、聚合、打印)只是构建一个内部算子操作图。只有调用了execute()才会提交到集群或者在本地计算机上执行。以下为完整代码,部分代码进行了简化(代码也可在GitHub上获取):packagemyflink;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java。tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.Collector;publicclassSocketWindowWordCount{publicstaticvoidmain(String[]args)throwsException{//创建执行环境finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//通过连接socket获取输入数据,这里连接本地9000端口,如果9000端口已经被占用请换一个端口DataStreamtext=env.socketTextStream("localhost",9000,"\n");//分析数据,按word分组,开窗,聚合DataStream>windowCounts=text.flatMap(newFlatMapFunction>(){@OverridepublicvoidflatMap(Stringvalue,Collectorout){for(Stringword:value.split("\\s")){out.collect(Tuple2.of(word,1));}}}).keyBy(0).timeWindow(时间。seconds(5)).sum(1);//打印结果到控制台,注意这里使用的是单线程打印,不是多线程windowCounts.print().setParallelism(1);env.execute("SocketWindowWordCount");}}运行程序运行示例程序,首先我们在终端启动netcat获取输入流:nc-lk9000如果是Windows平台,可以通过https://nmap.org安装ncat/ncat/然后运行:ncat-lk9000然后直接运行SocketWindowWordCount的main方法,只需要在netcat控制台输入单词,在SocketWindowWordCount的输出控制台就可以看到每个单词的词频统计。如果要查看大于1的计数,请重复键入相同的单词5秒钟。