要了解一个系统,通常要从架构开始。我们关心的问题是:系统部署成功后,各个节点启动了哪些服务,各个服务如何交互协调。下面是Flink集群启动后的架构图。Flink集群启动时,首先会启动一个JobManger和一个或多个TaskManager。客户端将任务提交给JobManager,JobManager再将任务调度给各个TaskManager执行,然后TaskManager将心跳和统计信息上报给JobManager。数据以流的形式在任务管理器之间传输。以上三个都是独立的JVM进程。Client是提交Job的客户端,可以在任何机器上运行(只要连接到JobManager环境即可)。提交Job后,Client可以结束进程(Streamingtask),也可以不结束等待结果返回。JobManager主要负责调度Jobs和协调Tasks做checkpoints。它的职责与NimbusofStorm非常相似。从Client接收到Job、JAR包等资源后,会生成优化后的执行计划,以Task为单位分发给各个TaskManager执行。槽(Slot)的数量是在TaskManager启动时设置的。每个slot可以启动一个Task,Task就是一个线程。从JobManager接收需要部署的Task。部署开始后,与自己的upstream建立Netty连接,接收并处理数据。可以看出Flink的任务调度是多线程模型,不同的Job/Task混合在一个TaskManager进程中。这种方式虽然可以有效提高CPU利用率,但我个人并不喜欢这种设计,因为它缺乏资源隔离机制,调试起来也不方便。与Storm的进程模型类似,实际应用中在JVM中只运行Job的Tasks是比较合理的。作业示例本文示例为flink-1.0.x版本。我们使用Flink自带的examples包中的SocketTextStreamWordCount。这是一个计算套接字流中单词出现次数的示例。首先使用netcat启动本地服务器:$nc-l9000然后提交Flink程序$bin/flinkrunexamples/streaming/SocketTextStreamWordCount.jar\--hostname10.218.130.9\--port9000在netcat端输入words并监控taskmanager的输出可以看到word统计的结果。SocketTextStreamWordCount具体代码如下:publicstaticvoidmain(String[]args)throwsException{//检查输入...//设置执行环境finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//获取输入数据DataStreamtext=env.socketTextStream(params.get("hostname"),params.getInt("port"),'\n',0);DataStream>counts=//成对拆分行(二元组),包含:(word,1)text.flatMap(newTokenizer())//按元组字段“0”分组并总结元组字段“1”.keyBy(0).sum(1);计数.打印();//执行程序env.execute("WordCountfromSocketTextStreamExample");}我们将代码env.execute的最后一行替换为System.out.println(env.getExecutionPlan());在本地运行代码(并发设置为2),可以得到拓扑的逻辑执行计划图的JSON字符串,将JSON字符串粘贴到http://flink.apache.org/visualizer/即可可视化执行Graph但这并不是Flink中运行的最终执行图,而是表示拓扑节点之间关系的计划图,对应于Flink中的SteramGraph。另外,拓扑提交后(并发设置为2),在UI中可以看到另一个执行计划图,如下图,对应Flink中的JobGraph。图表看起来有点乱,为什么会有这么多不同的图表。实际上,还有更多的图表。Flink中的executiongraph可以分为四层:StreamGraph->JobGraph->ExecutionGraph->Physicalexecutiongraph。StreamGraph:是用户通过StreamAPI编写的代码生成的初始图。用来表示程序的拓扑结构。JobGraph:StreamGraph经过优化生成JobGraph,是提交给JobManager的数据结构。主要优化是将多个符合条件的节点链接在一起作为一个节点,可以减少数据在节点间流动所需的序列化/反序列化/传输消耗。ExecutionGraph:JobManager基于JobGraph生成的分布式执行图,是调度层的核心数据结构。物理执行图:JobManager根据ExecutionGraph调度Job后,在各个TaskManager上部署Task后形成的“图”并不是一个具体的数据结构。例如并发度为2的SocketTextStreamWordCount的四层执行图(来源为1并发度)的演化过程如下图所示(点击查看大图):这里对一些术语做简单的解释。StreamGraph:由用户通过StreamAPI编写的代码生成的初始图形。StreamNode:用于表示算子的类,具有所有相关属性,如并发性、入出边等。StreamEdge:表示连接两个StreamNode的边。JobGraph:StreamGraph经过优化生成JobGraph,是提交给JobManager的数据结构。JobVertex:经过优化,多个满足条件的StreamNode可以链接在一起生成一个JobVertex,即一个JobVertex包含一个或多个算子,JobVertex的输入为JobEdge,输出为IntermediateDataSet。IntermediateDataSet:表示JobVertex的输出,即算子生成的数据集。生产者是JobVertex,消费者是JobEdge。JobEdge:表示作业图中的一个数据传输通道。源是IntermediateDataSet,目标是JobVertex。即数据从IntermediateDataSet通过JobEdge传递到目标JobVertex。ExecutionGraph:JobManager基于JobGraph生成的分布式执行图,是调度层的核心数据结构。ExecutionJobVertex:与JobGraph中的JobVertex一一对应。每个ExecutionJobVertex具有与并发数一样多的ExecutionVertex。ExecutionVertex:表示ExecutionJobVertex的并发子任务之一,输入为ExecutionEdge,输出为IntermediateResultPartition。IntermediateResult:与JobGraph中的IntermediateDataSet一一对应。每个IntermediateResult的IntermediateResultPartitions个数等于operator的并发数。IntermediateResultPartition:表示ExecutionVertex的一个输出分区,生产者是ExecutionVertex,消费者是若干个ExecutionEdge。ExecutionEdge:表示ExecutionVertex的输入,源为IntermediateResultPartition,目标为ExecutionVertex。source和target只能是一个。执行:是尝试执行ExecutionVertex。ExecutionVertex可能有多个ExecutionAttemptID,以防失败或数据重新计算。执行由ExecutionAttemptID唯一标识。JM和TM之间任务的部署和任务状态的更新都是通过ExecutionAttemptID来判断消息的接收者。物理执行图:JobManager根据ExecutionGraph调度Job后,在各个TaskManager上部署Task后形成的“图”并不是一个具体的数据结构。Task:Execution被调度后,在分配的TaskManager中启动相应的Task。任务用用户执行逻辑包装一个运算符。ResultPartition:表示一个Task产生的数据,对应ExecutionGraph中的IntermediateResultPartition。ResultSubpartition:是ResultPartition的子分区。每个ResultPartition包含多个ResultSubpartition,其数量由下游消费任务数量和DistributionPattern决定。InputGate:代表Task的输入包,与JobGraph中的JobEdge一一对应。每个InputGate消耗一个或多个ResultPartition。InputChannel:每个InputGate都会包含多个InputChannel,与ExecutionGraph中的ExecutionEdge一一对应,与ResultSubpartition一一对应,即一个InputChannel接收一个ResultSubpartition的输出。那么Flink为什么要设计这4张图,它的目的是什么?Spark中还有多个图、数据依赖图和物理执行的DAG。目的都是一样的,就是解耦。每张地图各司其职,每张地图对应工作的不同阶段,让那个阶段的事情更容易做。我们给出一个比较完整的FlinkGraph层次图。首先,我们看到除了StreamGraph,在JobGraph之上还有OptimizedPlan。OptimizedPlan由BatchAPI转换而来。StreamGraph由StreamAPI转换而来。为什么API不直接转为JobGraph?因为Batch和Stream的图结构和优化方式有很大不同。比如Batch有很多执行前的预分析来优化graph的执行,但是这种优化并不普遍适用于Stream,所以Batch通过OptimizedPlan来做优化会更加的方便和清晰,而且会不影响流。JobGraph的职责是统一Batch和Stream图,描述拓扑图的结构,优化链接。Chaining一般适用于Batch和Stream,所以这一层就淘汰了。ExecutionGraph的职责是方便调度和监控跟踪每个任务的状态,所以ExecutionGraph是一个并行化的JobGraph。“物理执行图”是最终分布在每台机器上运行的任务。所以可以看出这种解耦的方式极大的方便了我们在每一层做的工作,每一层之间都是相互隔离的。