实战在流计算场景中,数据会源源不断地流入ApacheFlink系统,每条数据进入ApacheFlink系统都会触发计算。如果我们要进行Count聚合计算,是不是每次触发计算都要重新计算历史上所有流入的数据,还是每次计算都在上一次计算结果的基础上递增计算?答案是肯定的,ApacheFlink会根据上次的计算结果进行增量计算。那么问题来了:“最后的计算结果保存在哪里?可以保存在内存中吗?”答案是不。如果是保存在内存中,如果一个计算节点因为网络、硬件等原因出现故障,最后的计算结果就会丢失。当节点恢复时,所有的历史数据(可能十几天,几百天的数据)都需要重新计算。因此,为了避免此类灾难性问题,ApacheFlink会使用State来存储计算结果。本文将介绍ApacheFlinkState的相关内容。什么是State的问题好像有点“***”?不管问题的答案是否显而易见,我还是想简单说一下ApacheFlink中的State是什么?状态是指计算节点在流计算过程中的中间计算结果或元素。数据属性,比如记录聚合过程中的中间聚合结果状态。例如,当使用ApacheKafka作为数据源时,我们还需要记录读取记录的偏移量。这些状态数据将在计算过程中(插入或更新)被持久化。所以ApacheFlink中的State是ApacheFlink任务内部数据(计算数据和元数据属性)的时间相关快照。为什么需要状态与批计算相比,状态是流计算所特有的。批计算没有故障转移机制,必须要么成功,要么重新计算。在大多数场景下,流计算是一种增量计算,数据一个一个地处理(在大多数场景下),每一次计算都在上一次计算结果之上进行处理。这样的机制势必会存储上次的计算结果(生产模式应该是持久化的),由于机器、网络、脏数据等原因导致的程序错误,需要从一个成功的检查点(checkpoint)恢复状态,这将在后面的章节中介绍)重新启动作业时。增量计算和故障转移等机制需要状态支持。状态实现ApacheFlink中有四种状态存储实现,分别是:基于内存的HeapStateBackend——用于调试模式,不推荐用于生产模式;基于HDFS的FsStateBackend——分布式文件持久化,每次读写都会产生NetworkIO,整体性能较差;基于RocksDB的RocksDBStateBackend——本地文件+异步HDFS持久化;还有一种基于Niagara(阿里巴巴对ApacheFlink的增强)NiagaraStateBackend——分布式持久化——应用于阿里巴巴生产环境;StatePersistence逻辑ApacheFlink版本选择RocksDB+HDFS进行State存储。状态存储分为两个阶段。首先在本地存储在RocksDB中,然后异步同步到远程HDFS。这种设计不仅消除了HeapStateBackend的局限性(内存大小、机器故障丢失等),还降低了纯分布式存储的网络IO开销。State分类ApacheFlink内部根据算子和数据分组的角度将State分为以下两类:groupby/PartitionBy字段的每个key都有自己的State,key与key之间的State是不可见的;OperatorState-ApacheFlink内部SourceConnector的实现会使用OperatorState来记录源数据读取时要采取的偏移量。状态缩放和重新分配ApacheFlink是一个大规模并行分布式系统,允许进行大规模的有状态流处理。为了可扩展性,ApacheFlink作业在逻辑上被分解为算子图,每个算子的执行在物理上被分解为多个并行的算子实例。从概念上讲,ApacheFlink中的每个并行算子实例都是一个独立的任务,可以将其调度到自己的机器上运行到连接到网络的其他机器上。在ApacheFlink的DAG图中,只有边连接的节点🈶网络通信,即整个DAG在垂直方向有网络IO,水平方向有状态节点之间没有网络通信如下图。这种模型也保证了每个Eachoperator实例都维护自己的状态并保存在本地磁盘上(远程异步同步)。通过这种设计,任务的所有状态数据都是本地的,状态访问不需要任务之间的网络通信。避免这种流量对于ApacheFlink等大规模并行分布式系统的可扩展性至关重要。如上,我们知道ApacheFlink中的State有OperatorState和KeyedState,那么在扩展(增加并发)时如何分配State呢?例如:外部Source有5个partition,从Srouce的1个并发扩展到ApacheFlink上的2个并发,中间的StatefulOperation节点由2个并发和3个并发扩展组成,如下图:在Apache中Flink,对于不同类型的State,有不同的扩展方式,接下来我们会分别介绍。我们在ApacheFlink中选择一个具体的Connector实现实例来介绍OperatorState是如何处理扩容的。以MetaQ为例,MetaQ以主题的形式订阅数据,每个主题都会有N>0个分区。上图是一个例子,加上我们订阅的MetaQ主题有5个partition,那么当我们的source从1并发调整为2并发时,State如何恢复呢?状态恢复的方式与Source中OperatorState的存储结构有一定关系。我们先看看MetaQSource的实现是如何存储State的。首先,MetaQSource实现ListCheckpointed,其中T为Tuple2publicinterfaceListCheckpointed
