本文将教你如何从零开始搭建你的第一个Flink应用。开发环境准备Flink可以运行在Linux、MaxOSX或Windows上。为了开发Flink应用程序,本地机器上需要Java8.x和maven环境。如果有Java8环境,运行以下命令会输出如下版本信息:$java-versionjavaversion"1.8.0_65"Java(TM)SERuntimeEnvironment(build1.8.0_65-b17)JavaHotSpot(TM)64-BitServerVM(build25.65-b01,mixedmode)如果有maven环境,运行如下命令会输出如下版本信息:$mvn-versionApacheMaven3.5.4(1edded0938998edf8bf061f1ceb3cfdeccf443fe;2018-06-18T02:33:14+08:00)Mavenhome:/Users/wuchong/dev/mavenJavaversion:1.8.0_65,vendor:OracleCorporation,runtime:/Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jreDefaultlocale:zh_CN,platformencoding:UTF-8OSname:"macosx",version:"10.13.6",arch:"x86_64",family:"mac"此外,我们推荐使用ItelliJIDEA(社区免费版即可)作为Flink应用的开发IDE。虽然Eclipse也可以,但是Eclipse在Scala和Java混合项目下存在一些已知问题,所以不推荐使用Eclipse。在下一章中,我们将介绍如何创建一个Flink项目并将其导入到ItelliJIDEA中。创建Maven项目我们将使用FlinkMavenArchetype来创建我们的项目结构和一些初始默认依赖项。在您的工作目录中,运行以下命令来创建项目:mvnarchetype:generate\-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java\-DarchetypeVersion=1.6.1\-DgroupId=my-flink-project\-DartifactId=my-flink-project\-Dversion=0.1\-Dpackage=myflink\-DinteractiveMode=false可以将上面的groupId,artifactId,package编辑到自己喜欢的路径下。使用以上参数,Maven会自动为你创建如下项目结构:├──BatchJob.java│└──StreamingJob.java└──resources└──log4j.properties我们的pom.xml文件中已经包含了需要的Flink依赖,src/main/java下也有几个示例程序框架。接下来我们将开始编写我们的第一个Flink程序。编写Flink程序启动IntelliJIDEA,选择“ImportProject”(导入项目),选择my-flink-project根目录下的pom.xml。根据向导完成项目导入。在src/main/java/myflink下创建SocketWindowWordCount.java文件:注意下面我们不会写import语句,因为IDE会自动添加。在本节的最后,我将展示完整的代码。如果想跳过后面的步骤,可以直接把***的完整代码粘贴到编辑器中。Flink程序的第一步是创建一个StreamExecutionEnvironment。这是一个入口类,可以用来设置参数、创建数据源和提交任务。因此,让我们将其添加到主函数中:StreamExecutionEnvironmentsee=StreamExecutionEnvironment.getExecutionEnvironment();接下来我们将创建一个数据源,它从本地端口号9000上的套接字读取数据:DataStreamtext=env.socketTextStream("localhost",9000,"\n");这将创建一个String类型的DataStream。DataStream是Fl??ink中流处理的核心API,它定义了很多常用的操作(如过滤、转换、聚合、开窗、关联等)。在此示例中,我们感兴趣的是每个单词在特定时间窗口(例如5秒窗口)中出现的次数。为此,我们首先需要将字符串数据解析成单词和次数(用Tuple2表示),第一个字段是单词,第二个字段是次数,次数的初始值设置为1.我们实现了一个平面图来进行解析,因为一行数据中可能有多个单词。数据流
