当前位置: 首页 > 科技观察

ApacheFlink漫谈系列(04)-状态

时间:2023-03-13 14:40:36 科技观察

实战在流计算场景中,数据会源源不断地流入ApacheFlink系统,每条数据进入ApacheFlink系统都会触发计算。如果我们要进行Count聚合计算,是不是每次触发计算都要重新计算历史上所有流入的数据,还是每次计算都在上一次计算结果的基础上递增计算?答案是肯定的,ApacheFlink会根据上次的计算结果进行增量计算。那么问题来了:“最后的计算结果保存在哪里?可以保存在内存中吗?”答案是不。如果是保存在内存中,如果一个计算节点因为网络、硬件等原因出现故障,最后的计算结果就会丢失。当节点恢复时,所有的历史数据(可能十几天,几百天的数据)都需要重新计算。因此,为了避免此类灾难性问题,ApacheFlink会使用State来存储计算结果。本文将介绍ApacheFlinkState的相关内容。什么是State的问题好像有点“***”?不管问题的答案是否显而易见,我还是想简单说一下ApacheFlink中的State是什么?状态是指计算节点在流计算过程中的中间计算结果或元素。数据属性,比如记录聚合过程中的中间聚合结果状态。例如,当使用ApacheKafka作为数据源时,我们还需要记录读取记录的偏移量。这些状态数据将在计算过程中(插入或更新)被持久化。所以ApacheFlink中的State是ApacheFlink任务内部数据(计算数据和元数据属性)的时间相关快照。为什么需要状态与批计算相比,状态是流计算所特有的。批计算没有故障转移机制,必须要么成功,要么重新计算。在大多数场景下,流计算是一种增量计算,数据一个一个地处理(在大多数场景下),每一次计算都在上一次计算结果之上进行处理。这样的机制势必会存储上次的计算结果(生产模式应该是持久化的),由于机器、网络、脏数据等原因导致的程序错误,需要从一个成功的检查点(checkpoint)恢复状态,这将在后面的章节中介绍)重新启动作业时。增量计算和故障转移等机制需要状态支持。状态实现ApacheFlink中有四种状态存储实现,分别是:基于内存的HeapStateBackend——用于调试模式,不推荐用于生产模式;基于HDFS的FsStateBackend——分布式文件持久化,每次读写都会产生NetworkIO,整体性能较差;基于RocksDB的RocksDBStateBackend——本地文件+异步HDFS持久化;还有一种基于Niagara(阿里巴巴对ApacheFlink的增强)NiagaraStateBackend——分布式持久化——应用于阿里巴巴生产环境;StatePersistence逻辑ApacheFlink版本选择RocksDB+HDFS进行State存储。状态存储分为两个阶段。首先在本地存储在RocksDB中,然后异步同步到远程HDFS。这种设计不仅消除了HeapStateBackend的局限性(内存大小、机器故障丢失等),还降低了纯分布式存储的网络IO开销。State分类ApacheFlink内部根据算子和数据分组的角度将State分为以下两类:groupby/PartitionBy字段的每个key都有自己的State,key与key之间的State是不可见的;OperatorState-ApacheFlink内部SourceConnector的实现会使用OperatorState来记录源数据读取时要采取的偏移量。状态缩放和重新分配ApacheFlink是一个大规模并行分布式系统,允许进行大规模的有状态流处理。为了可扩展性,ApacheFlink作业在逻辑上被分解为算子图,每个算子的执行在物理上被分解为多个并行的算子实例。从概念上讲,ApacheFlink中的每个并行算子实例都是一个独立的任务,可以将其调度到自己的机器上运行到连接到网络的其他机器上。在ApacheFlink的DAG图中,只有边连接的节点🈶网络通信,即整个DAG在垂直方向有网络IO,水平方向有状态节点之间没有网络通信如下图。这种模型也保证了每个Eachoperator实例都维护自己的状态并保存在本地磁盘上(远程异步同步)。通过这种设计,任务的所有状态数据都是本地的,状态访问不需要任务之间的网络通信。避免这种流量对于ApacheFlink等大规模并行分布式系统的可扩展性至关重要。如上,我们知道ApacheFlink中的State有OperatorState和KeyedState,那么在扩展(增加并发)时如何分配State呢?例如:外部Source有5个partition,从Srouce的1个并发扩展到ApacheFlink上的2个并发,中间的StatefulOperation节点由2个并发和3个并发扩展组成,如下图:在Apache中Flink,对于不同类型的State,有不同的扩展方式,接下来我们会分别介绍。我们在ApacheFlink中选择一个具体的Connector实现实例来介绍OperatorState是如何处理扩容的。以MetaQ为例,MetaQ以主题的形式订阅数据,每个主题都会有N>0个分区。上图是一个例子,加上我们订阅的MetaQ主题有5个partition,那么当我们的source从1并发调整为2并发时,State如何恢复呢?状态恢复的方式与Source中OperatorState的存储结构有一定关系。我们先看看MetaQSource的实现是如何存储State的。首先,MetaQSource实现ListCheckpointed,其中T为Tuple2publicinterfaceListCheckpointed{ListsnapshotState(longvar1,longvar3)throwsException;voidrestoreState(Listvar1)throwsException;}我们发现snapshotState方法的返回值是一个List,T是Tuple2publicinterfaceInputSplitextendsSerializable{intgetSplitNumber();}也就是说InputSplit可以理解为一个Partition索引。有了这个数据结构,我们在看上图的case是怎么工作的?当Source的并行度为1时,所有分区的数据都在同一个线程中读取,所有分区的状态也保持在同一个状态。State存储信息的格式如下:如果我们现在把并发度调整为2,那么我们5个分区的State就会在两个独立的任务(线程)中维护。在内部实现中,我们有如下算法来分配每个Task处理和维护的partition的State信息,如下:ListassignedPartitions=newLinkedList<>();for(inti=0;i0个key,一个key-group是State分配的原子单位。ApacheFlink中Key-Group的对象是KeyGroupRange,如下:publicclassKeyGroupRangeimplementsKeyGroupsList,Serializable{...privatefinalintstartKeyGroup;privatefinalintendKeyGroup;...}KeyGroupRange的两个重要属性是startKeyGroup和endKeyGroup,定义了startKeyGroup和endKeyGroup属性之后Operator上的Key-Groups数量是确定的。什么决定了Key-Groups的数量Key-Groups的数量必须在作业开始之前确定,并且在运行期间不能更改。由于key-group是状态分配的原子单位,每个operator并行实例至少包含一个key-group,operator的最大并行度不能超过设置的key-group数量,那么ApacheFlink内部key的数量-groupsimplemented是最大并行度值。GroupRange.of(0,maxParallelism)如何判断key属于哪个Key-Group确定了GroupRange后,如何判断每个Key属于哪个Key-Group呢?我们采用mod方法,KeyGroupRangeAssignment中的assignToKeyGroup方法将key划分到指定的key-group中,如下:intmaxParallelism){returnHashPartitioner.INSTANCE.partition(keyHash,maxParallelism)}@Overridepublicintpartition(Tkey,intnumPartitions){returnMathUtils.murmurHash(Objects.hashCode(key))%numPartitions;}如上我们理解了将Key赋值给的逻辑指定的key-group是使用key的hashCode和maxParallelism进行求余运算。分散式。如下图,当parallelism=2,maxParallelism=10时,key和key-group在stream上的对应关系如下图:如上图key(a)的hashCode为97,与***并发后10余7。它被分配给KG-7,并且流中的每个事件都将被分配给从KG-0到KG-9的键组之一。每个Operator实例如何获取Key-Groups在了解了Key-Groups的概念以及如何将每个Key分配给指定的Key-Groups之后,我们来看看如何计算每个Operator实例处理的Key-Groups。在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法描述了分配算法:publicstaticKeyGroupRangecomputeKeyGroupRangeForOperatorIndex(intmaxParallelism,intparallelism,intoperatorIndex){GroupRangesplitRange=GroupRange.of(0,maxParallelism).getSplitRange(parallelism,operatorIndex);intstartGroup=splitRange.getStartGroup();intendGroup=splitRange.getEndGroup();returnnewKeyGroupRange(startGroup,endGroup-1);}publicGroupRangegetSplitRange(intnumSplits,intsplitIndex){...finalintnumGroupsPerSplit=getNumGroups()/numSplits;finalintnumFatSplits=getNumGroups()%numSplits;intstartGroupForThisSplit;intendGroupForThisSplit;if(splitIndex