一、获取并导入Flink源码1、下载Flink源码Flink源码有两种获取方式:一种是直接从源码下载地址下载在官网上,一个是通过gitclone。1)官网下载方式下载地址为https://flink.apache.org/downloads.html选择Flink1.9.0的Source版本进行下载。2)在gitclone模式下,输入gitclonegit@github.com:apache/flink.git命令,将源码下载到本地,如图1-5所示。▲图1-5gitclone下载2.导入Flink源码导入Flink源码分为两步,将Flink源码导入IDEA,配置Flink源码的CheckStyle。其中,配置Flink源码的CheckStyle是为了保证Flink源码的修改符合CheckStyle中的规范要求。将下载好的Flink源码导入IDEA,流程如下。启动IntelliJIDEA,点击欢迎窗口右上角的打开按钮。在弹窗中选择Flink源码的根目录。选择Importprojectfromexternalmodelandmavenitems,然后单击Next按钮。选择SDK。如果你之前没有配置过SDK,点击“+”图标,然后点击JDK,选择你的JDK目录,点击OK按钮。点击Next按钮完成Flink源码的导入。点击导入项目右侧的Maven→GenerateSourcesandUpdateFolders图标,将FlinkLibrary构建到Maven本地仓库中。构建项目(单击Build→MakeProject图标)。想要对Flink进行二次开发或者向开源社区贡献代码的读者可以选择配置CheckStyle。1)JavaCheckStyle配置过程IntelliJIDEA通过CheckStyle-IDEA插件支持CheckStyle。在IntelliJIDEA的插件市场中查找并安装CheckStyle-IDEA插件。选择Settings→Tools→Checkstyle并设置checkstyle。将扫描范围设置为仅Java源(包括测试)。在Checkstyle版本下拉列表中选择checkstyle版本,然后单击Apply按钮。(注:官方推荐版本为8.12。)点击ConfigurationFile面板中的“+”图标添加新配置:在弹出的窗口中将Description设置为Flink;选择UsealocalCheckstylefile,选择Flink源码目录tools/maven/checkstyle.xml文件;勾选Storerelativetoprojectlocation选项,点击Next按钮;将checkstyle.suppressions.file的属性设置为suppressions.xml,点击Next按钮完成配置。勾选Flink刚刚添加的新配置,将其设置为激活配置,依次点击Apply和OK按钮,完成CheckStyle的Java部分的配置。如果源代码违反了CheckStyle规范,CheckStyle将给出警告。CheckStyle构建完成后,依次选择Settings→Editor→CodeStyle→Java,点击齿轮图标,选择导入Flink源码目录下的tools/maven/checkstyle.xml文件,这样导入布局就可以了被自动调整。您可以单击CheckStyle窗口中的CheckModule按钮扫描整个模块以检测代码的CheckStyle。注意:目前Flink源码的flink-core、flink-optimizer和flink-runtime模块并没有完全满足设置CheckStyle的要求,所以这三个模块出现违反CheckStyle的警告是正常的.2)ScalaCheckStyle配置过程打开Scala的CheckStyle,依次选择Settings→Editor→Inspections,搜索Scalastyleinspections并勾选。将Flink源码目录下的tools/maven/scalastyle_config.xml放到Flink源码的.idea目录下,完成CheckStyle的Scala部分的配置。2编译调试Flink源码1.编译构建Flink源码已经导入,CheckStyle配置完成。接下来,编译构建Flink。在构建源码之前,如果需要修改Flink版本,可以通过修改Flink源码的tools/change-version.sh来实现。Flink源码的编译构建会根据Maven版本的不同而有所差异。对于Maven3.0.x、3.1.x和3.2.x版本,您可以简单地构建Flink,并在Flink源代码的根目录下运行以下命令。$mvncleaninstall-DskipTests对于Maven3.3.x及以上版本,相对比较麻烦。在Flink源码根目录下运行如下命令。$mvncleaninstall-DskipTests$cdflink-dist$mvncleaninstall推荐使用Maven3.2.5版本,下面将介绍更多基于该版本的构建内容。使用以下方法快速构建Flink源码、跳过测试、QA插件和Java文档。$mvncleaninstall-DskipTests-Dfast会在构建Flink时默认构建一个Flink特定的Hadoop2jar,以便Flink使用HDFS和YARN。大多数开发者需要指定Hadoop版本(建议选择Hadoop2.4及以上版本)。$mvncleaninstall-DskipTests-Dhadoop.version=3.2.2-Dinclude-hadoop增加了-Dinclude-hadoop参数,会把Hadoop类打包成lib目录下的flink-dist*.jar,否则Hadoop会作为一个jar包放在opt目录下。选择合适的方式构建Flink项目,将Flink构建放在本地Maven仓库,将Flink源码构建结果放在build-target目录(Flink源码构建目录)。可以将build-target目录压缩成一个tar包,就是和官网一样的Flink二进制包。2、Flink源码调试调试Flink源码,有助于我们了解源码的执行过程,排查问题。Flink源码调试分为本地调试和远程调试,下面分别介绍。1)本地调试以Flink源码中的WordCountwithStreaming为例,介绍如何进行本地调试。在Flink源码目录flink-examples/flink-examples-streaming的多级子目录下找到WordCount.java,选择Debug。读者可以在Flink源码中设置断点进行跟踪调试。2)远程调试本地调试仅限于部署模式下的本地模式。基于Standalone、YARN、Kubernetes的部署方式需要远程调试。远程调试的方法有两种:一种是修改日志级别,另一种是修改配置开启Java远程调试。修改日志级别打开Flink源码build目录(build-target)下的conf/log4j.properties,根据需要将内容中的INFO改为DEBUG,如下图,只需将rootLogger的赋值由INFO改为DEBUG即可.修改log4j.properties后构建并运行Flink,即可通过DEBUG日志进行远程调试。#设置全局日志级别log4j.rootLogger=DEBUG,file#也可以根据需要更改Flink、Akka、Hadoop、Kafka、ZooKeeper包等包的日志级别log4j.logger.org.apache.flink=INFOlog4j.logger。akka=INFOlog4j.logger.org.apache.kafka=INFOlog4j.logger.org.apache.hadoop=INFOlog4j.logger.org.apache.zookeeper=INFO#Logallinfosinthegivenfilelog4j.appender.file=org.apache.log4j.FileAppenderlog4j.appender。文件.file=${log.file}log4j.appender.file.append=falselog4j.appender.file.layout=org.apache.log4j.PatternLayoutlog4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS}%-5p%-60c%x-%m%n#Suppresstheirrelevant(wrong)warningsfromtheNettychannelhandlerlog4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=错误、文件修改配置开启Java远程调试首先打开IDEA,创建一个Remote项(见图1-6)复制Remote项的Java运行参数内容,然后修改Fl中的conf/flink-conf.yamlink构建目录,并添加env.java.opts属性及取值,新增内容如下:env.java.opts:-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005▲图1-6配置Remote项远程调试也可以通过env.java.opts.jobmanager使用env.java.opts.taskmanager设置JobManager和TaskManager的运行参数,实现远程调试设置。配置完成后,基于该build目录运行Flink应用,并根据运行的JobManager和TaskManager的IP修改之前配置的Remote项的host,在Flink源码中设置断点,通过Debug配置Remote项执行远程调试。env.java.opts、env.java.opts.jobmanager和env.java.opts.taskmanager的设置方法适用于onKubernetes模式,因为Flink运行的各个组件的IP不同。在其他模式下,存在运行组件与IP和调试端口相同的问题。这种情况可以考虑修改日志级别的方法。通过学习Flink源码的编译构建,我们知道如何按需构建Flink发布包。通过学习Flink源码的调试,我们对源码调试有了更深的理解,为后续理解源码和排查源码问题打下了基础。作者简介:罗江宇,Flink技术专家,曾就职于新浪微博、滴滴和大型电商。曾主导或参与多家公司Flink实时计算服务建设、超大规模集群维护、Flink引擎改造。具有丰富的实时计算实践经验,目前专注于Kubernetes调度、FlinkSQL和Flink流批集成。赵世杰,资深大数据技术专家,曾就职于滴滴、阿里巴巴等一线互联网公司。从0到1深度参与滴滴大数据建设,在一线大数据平台建设方面有着非常丰富的经验,对大数据领域的计算和存储引擎有深入研究。大数据研发专家李汉淼,曾任滴滴大数据开发工程师。在大数据领域工作多年,参与过多家公司流计算平台的设计和开发。目前主要从事批流集成、OLAP技术的研究与应用。闵文军,蚂蚁集团技术专家,开源大数据社区爱好者,FlinkContributor,在实时计算领域工作多年,深度参与实时计算的建设滴滴和蚂蚁集团的计算平台。本文节选自《Flink技术内幕:架构设计与实现原理》,经发布者授权发布。(书号:9787111696292)
