本文介绍了字节跳动在Flink状态查询方面的优化,解决了查询Flink任务状态时开发成本高,无法查询状态元信息等问题。StateQueryonFlinkSQL的解决方案可以让用户使用FlinkBatchSQL快速查询Flink任务的状态。背景众所周知,Flink中的State保存了算子计算过程的中间结果。当任务发生异常时,可以通过查询任务快照中的State获取有效线索。但是目前对于FlinkSQL任务,当我们想要查询一个作业的状态时,通常查询状态的代价太大,因为我们无法知道状态的定义方式和具体类型。为了解决这个问题,字节跳动流计算团队在内部提出了StateQueryonFlinkSQL的解决方案——用户可以简单地通过编写SQL来查询State。本文将主要介绍字节跳动在Flink状态查询方面的相关工作。StateProcessorAPI的介绍,提到状态查询,我们自然而然会想到Flink在1.9版本中提出的特性——StateProcessorAPI。使用StateProcessorAPI,我们可以将作业产生的Savepoint转换为DataSet,然后使用DataSetAPI完成State的查询、修改、初始化等操作。下面简单介绍一下如何使用StateProcessorAPI完成State查询:首先创建一个ExistingSavepoint来代表一个Savepoint。初始化ExistingSavepoint时,需要提供Savepoint路径、StateBackend等信息;然后实现ReaderFunction重新注册需要查询的State,定义处理State的方式。在查询状态的过程中,会遍历所有的key,按照我们定义的方式操作状态;最后调用Savepoint.readKeyedState,传入operator的uid和ReaderFunction即可完成状态查询。下面简单介绍一下State查询的原理。Savepoint目录下有两种文件,一种是状态数据文件,比如上图中的opA-1-state,里面存放了算子A在第一个SubTask状态的详细数据;还有一个metadataFile,对应上图中的_metadata,metadata文件中存放的是各个算子与state文件的映射关系。当我们在做状态查询的时候。首先,在客户端,元数据文件会根据保存点路径进行解析。通过操作员ID,可以获取到要查询的状态对应的文件句柄。当真正执行状态查询时,负责读取状态的Task会新建一个StateBackend,然后将状态文件中的数据恢复到Statebackend中。状态恢复完成后,会遍历所有Key,将对应的状态交给ReaderFunction处理。可能有同学会问,既然社区已经提供了查询State的功能,那我们为什么还要做同样的工作呢?主要原因是我们在使用StateProcessorAPI的过程中发现了一些问题:每次查询State都需要独立开发一个FlinkBatch任务,对用户来说有一定的开发成本;在实现ReaderFunction时,我们需要清楚地了解任务的状态定义方法,包括State的名称、类型、StateDescriptor,用户使用门槛较高;使用StateProcessorAPI时,只能查询单个算子的状态,不能同时查询多个算子的状态;无法直接查询任务状态的元信息,比如查询任务使用了哪些状态,或者查询某个状态的类型。一般来说,我们有两个目标,一个是降低用户的使用成本;二是增强状态查询功能。我们希望用户在查询状态时,可以使用最简单的方式;同时,他们不需要知道任何信息。另外,我们也希望用户可以同时查询多个算子的状态,也可以直接查询作业使用了哪些状态,每个状态是什么类型。因此,我们提出了FlinkSQL上StateQuery的解决方案。简单的说,就是把State当成一个数据库,让用户通过写SQL就可以很方便的查询State。在这个方案中,我们需要解决两个问题:如何屏蔽用户的State信息:参考StateProcessorAPI,我们知道查询State需要提供很多信息,比如Savepoint路径,StateBacked类型,operatorid,StateDescriptor等。这些复杂的信息显然很难通过SQL语句完整表达,那么查询状态到底需要什么,如何屏蔽State的复杂细节呢?这是我们面临的第一个困难。如何使用SQL来表达State:Flink中State的存储方式与Database的存储方式不一样。怎么用SQL来表达状态查询过程呢?这是我们要解决的另一个难点。StateMetaSnapshot机制首先我们来回答第一个问题,查询一个State需要哪些信息?可以参考上面StateProcessorAPI的例子。我们在创建ExistingSavepoint和ReaderFunction时,需要提供Savepoint路径、Backend类型、OperatorID、operatorkey类型、State名称、Serializer等信息。我们可以把这些统称为状态的元信息。对于FlinkSQL任务,用户清楚了解这些信息的门槛非常高。我们的想法是,用户只需要提供最简单的信息,即SavepointID,然后Flink框架将其他元信息存储在Savepoint中,这样就可以屏蔽State的复杂细节,避免用户和查询状态可以完成。因此,我们引入了StateMetaSnapshot机制。简单来说,StateMetaSnapshot就是将状态元信息添加到SavepointMetadata中的过程。具体步骤如下:首先,在注册State时,Task会将operatorName\ID\KeySerializer\StateDescriptors等元信息保存在Task的内存中;当一个Savepoint被触发时,Task在做快照的同时也会对状态的元信息进行快照。快照完成后,将状态元信息(StateMeta)和状态文件句柄(StateHandle)上报给JobManager;JobManager收到所有Task上报的StateMeta信息后,将这些状态元信息进行合并,最后将合并后的State元信息保存到Savepoint目录下一个名为stateInfo的文件中。之后查询状态时只需要解析Savepoint中的stateInfo文件,而不需要用户通过代码输入State的元信息。这样可以大大降低用户查询状态的成本。StateasDatabase接下来我们来回答第二个问题,如何用SQL来表达State。事实上,社区在设计StateProcessorAPI时提出了一些解决方案,即StateAsDatabase。在传统数据库中,一个表通常由三个元素表示:Catalog、Database和Table。其实我们也可以将同样的逻辑映射到FlinkState上。我们可以把Flink的State看成是一个特殊的数据源,一个作业产生的每个Savepoint都看成是一个独立的DB。在这个DB中,我们将状态元信息和状态详细数据抽象到不同的表中,并将它们公开给用户。用户可以直接查询这些表来获取任务状态信息。首先,让我们看看如何将State表示为Table。我们都知道在Flink中,常用的State有两种,分别是KeyedState和OperatorState。对于OperatorState,它只有一个属性Value,用来表示这个State的具体值。因此,我们可以将OperatorState表示为一个只包含一个Value字段的表结构。对于KeyedState,在不同的Key和Namespace下,每个State的值可能不同,所以我们可以将KeyedState表示为一个表结构,包含三个字段:Key,Namespace,Value。我们抽象出单个State之后,表示多个State就容易多了。可以看出,在上面的例子中,这个算子包含了3个状态,分别是两个KeyedStates和一个OperatorState。我们只需要简单地将这些表联合起来,然后通过state_name字段来区分不同的状态即可。指示此运算符中的所有状态。最后还有一个问题,我们怎么知道一个任务使用了哪些状态或者这些状态的具体类型呢?为了解决这个问题,我们定义了一个特殊的表——StateMeta,用来表示一个Flink任务中所有State的元信息。StateMeta包含任务中各个State的名称、State的算子ID、算子名称、Key的类型和Value的类型等,方便用户直接查询StateMeta表获取元信息任务中的所有状态。使用FlinkBatchSQL查询任务状态以上就是状态查询方案的整体介绍。那么我们如何查询一个状态呢?下面以一个WordCount任务为例进行说明。首先,我们需要创建一个FlinkSQL任务并启动它。通过web-ui可以看到这个任务包含了三个算??子,分别是Source、Aggregate和Sink。然后,我们就可以触发Savepoint,在Savepoint创建成功的时候得到对应的SavepointID。我们可以通过SavepointID查询作业状态。如果我们对FlinkSQL任务中状态的使用一无所知,那么首先要查询的是这个Flink任务中包含了哪些状态,以及这些状态的类型。我们可以从StateMeta表中获取这些信息。如上图场景1所示,通过查询StateMeta表,可以看到这个task中包含一个ListState和一个ValueState,分别存在于Source算子和Aggregate算子中。另外,熟悉Flink的同学都知道,KafkaSource中的State是用来记录当前消费的offset信息的。如场景二所示,我们可以通过查询Source算子的状态获取任务中消费的KafkaTopic的Partition和Offset信息。还有一种比较常见的场景,比如下游业务同学发现某个key(比如key_662)的结果异常。在定位问题的时候,我们可以直接在job中查询聚合算子的状态,同时指定key等于key_662作为查询条件。如上图场景三,从查询结果可以看出,当key为662时,对应的聚合结果为11290,这样用户可以方便的验证状态是否正确。展望未来,我们计划进一步丰富国家的职能。目前我们支持SQL查询State的功能。其实社区也提供了修改和初始化State的能力。在某些场景下,这些能力也更为重要。比如我们知道state中的一些key被计算错了,我们希望对state中的这部分数据进行修正;或者任务逻辑在改变后与之前的状态不完全兼容,这时候我们希望状态能够被修改和初始化。生成一个新的保存点。同样,在使用上,我们也希望用户可以直接使用SQL中的insert和update语法来完成状态修改和初始化操作。二是进一步提升State的易用性。我们使用DAG编辑的方案来解决作业拓扑变化时状态不兼容的问题,但是当FlinkSQL任务修改字段时,StateSerializer可能会发生变化,这也会导致状态不兼容。针对这种情况,我们设计了一套完整的FlinkSQLStateSchemaEvolution方案,可以极大提升FlinkSQL任务变更后的状态恢复能力,目前正在实施中。我们还提供完善的状态恢复预检能力,可以检查状态是否兼容,并在任务上线前通知用户,避免状态不兼容带来的作业启动失败的影响。
