当前位置: 首页 > 科技观察

AdaptiveBatchJobScheduler:自动为FlinkBatchJobs获取并行度

时间:2023-03-17 17:58:56 科技观察

01简介对于大多数用户来说,为Flink算子配置合适的并行度并不是一件容易的事。对于批处理作业,并行度太小会导致运行时间长,故障恢复慢,而不必要的大并行度会导致资源浪费,任务部署和数据shuffle的开销也会增加。为了控制批处理作业的执行时间,算子的并行度应该与其需要处理的数据量成正比。用户需要通过估计估计器需要处理的数据量来配置并行度。然而,估计器需要处理的数据量很难准确估计:需要处理的数据量可能每天都在变化,工作中可能会有大量的UDF和复杂的算子,难以判断输出数据量。为了解决这个问题,我们在Flink1.15中引入了一个新的调度器:AdaptiveBatchScheduler。自适应批处理作业调度程序会根据每个操作员需要处理的实际数据量,在作业运行时自动得出并行度。会带来以下好处:大大减少了批作业并发调优的繁琐;可以根据处理的数据量为不同的算子配置不同的并行度,特别是对于只能在Beneficial之前配置全局并行度的SQL作业;更好地适应每天变化的数据量。02Usage为了使Flink能够自动推导operator的并行度,需要进行如下配置:开启adaptivebatchjobscheduler;配置算子的并行度为-1。2.1启用自适应批处理作业调度器启用自适应批处理作业调度器,需要进行如下配置:configurejobmanager.scheduler:AdaptiveBatch;配置execution.batch-shuffle-mode为ALL-EXCHANGES-BLOCKING(默认值)。因为目前adaptivebatchjobscheduler只支持shufflemode为ALL-EXCHANGES-BLOCKING的作业。此外,还有一些相关的配置来指定自动导出算子并行度的上限和下限,每个算子期望处理的数据量,以及源算子的默认并行度。具体可以参考Flink文档[1]。2.2配置算子的并行度为-1。adaptivebatchjobscheduler只会为用户没有指定并行度的算子推导出并行度(即并行度为默认值-1)。因此需要进行如下配置:configureparallelism.default:-1;对于SQL作业,配置table.exec.resource.default-parallelism:-1;对于DataStream/DataSet作业,避免在作业方法中通过operator的setParallelism()来指定并行度;对于DataStream/DataSet作业,避免通过StreamExecutionEnvironment/ExecutionEnvironment的setParallelism()方法指定作业中的并行度。03实现细节接下来介绍自适应批处理作业调度器的实现细节。在此之前,先简单介绍一下涉及到的一些术语概念:逻辑节点(JobVertex)[2]和逻辑拓扑(JobGraph)[3]:逻辑节点是将几个算子链接在一起以获得更好的性能算子链形成,逻辑拓扑是由多个逻辑节点连接组成的数据流图。执行节点(ExecutionVertex)[4]和执行拓扑(ExecutionGraph)[5]:执行节点对应一个可部署的物理任务,由逻辑节点根据并行度扩展生成。例如,如果一个逻辑节点的并行度为100,则将生成100个对应的执行节点。执行拓扑是由所有执行节点连接组成的物理执行图。以上概念的介绍可以在Flink文档[6]中找到。需要注意的是,自适应批处理作业调度器通过推导逻辑节点的并行度来确定节点中包含的算子的并行度。实现细节主要包括以下几个部分:使调度器能够收集执行节点输出数据的大小;引入一个新的组件VertexParallelismDecider[7],负责根据逻辑节点需要处理的数据量计算并行度;支持执行拓扑的动态构建,即执行拓扑从一个空的执行拓扑开始,然后逐步添加执行节点和作业调度;引入自适应批处理作业调度程序来更新和调度执行拓扑。后续章节将对以上内容进行详细介绍。图1-并行度自动推导总体结构3.1收集执行节点产生的数据量自适应批处理作业调度器根据逻辑节点需要处理的数据量来决定其并行度,因此需要收集执行节点产生的数据量上游节点数量产生的数据。为此,我们引入一个numBytesProduced计数器来记录每个执行节点产生的数据分区(ResultPartition)的数据量,并在执行节点运行结束时将累加值发送给调度器。3.2确定逻辑节点合适的并行度我们引入一个新的组件VertexParallelismDecider来负责计算逻辑节点的并行度。计算算法如下:假设V为用户配置的每个执行节点期望处理的数据量;totalBytenon-broadcast是逻辑节点需要处理的非广播数据总量;totalBytesbroadcast是逻辑节点需要处理的广播数据总量;maxBroadcastRatio是每个执行节点处理广播数据的比例上限;normalize(x)是一个输出最接近x的2的幂的函数。计算并行度的公式如下:值得注意的是,我们在这个公式中引入了两个特殊处理:限制每个执行节点处理广播数据的比例;调整并行度为2的幂。另外,上面的公式不能直接用来判断源节点的并行度,因为源节点不消费数据。为了解决这个问题,我们引入了配置选项jobmanager.adaptive-batch-scheduler.default-source-parallelism,允许用户手动配置源节点的并行度。请注意,并非所有源都需要此选项,因为某些源可以自己推断并行性(例如,HiveTableSource,有关详细信息,请参阅HiveParallelismInference),对于这些源,更建议自己推断并行性。3.2.1限制每个执行节点处理广播数据的比例我们将公式中每个执行节点处理广播数据的上限比例限制为maxBroadcastRatio。即每个执行节点处理的非广播数据至少为(1-maxBroadcastRatio)*V。如果不这样做,当广播数据量接近V时,即使非广播数据量非常小,可能会以很大的并行度计算,这是不必要的,会导致资源浪费和任务部署。开支变大。通常,执行节点需要处理的广播数据量会小于要处理的非广播数据量。因此,我们默认将maxBroadcastRatio设置为0.5。目前,这个值是硬编码在代码中的,我们会考虑在未来让它可配置。3.2.2将并行度调整为2的幂normalize函数将并行度调整为最接近的2的幂,以避免引入数据倾斜。为了更好地理解本节,我们建议您先阅读子分区动态映射部分。以图4(b)为例,A1/A2产生4个子分区,最终确定B的并行度为3。此时B1会消耗1个子分区,B2会消耗1个子分区,B3会消耗2个子分区。我们假设不同子分区的数据量是一样的,那么B3需要消耗的数据量是B1/B2的两倍,造成数据倾斜。为了解决这个问题,我们需要让所有下游执行节点消耗的子分区数量相同,也就是说,上游产生的子分区数量应该是并行度的整数倍下游逻辑节点。为了简单起见,我们希望用户指定的最大并行度为2^N(如果不是,会自动调整为不超过配置值的2^N),然后调整下游逻辑节点的并行度为最接近的2^M(M<=N),这样可以保证子分区被下游均匀消费。不过这只是暂时的解决方案,最终应该会通过自动负载均衡来解决,我们会在后续版本中实现。3.3执行拓扑的动态构建在引入自适应批处理作业调度器之前,执行拓扑是静态构建的,即在调度开始之前就已经完全创建了执行拓扑。为了在运行时确定逻辑节点并行性,执行拓扑需要支持动态构建。3.3.1动态添加节点和边到执行拓扑动态构建执行拓扑是指一个Flink作业从一个空的执行拓扑开始,然后按计划逐渐附加执行节点,如图2所示。执行拓扑包括执行节点和执行边缘(ExecutionEdge)。仅在以下情况下扩展逻辑节点以创建执行节点并添加到执行拓扑中:已确定相应逻辑节点的并行度(以便Flink知道应该创建多少个执行节点);所有的上游逻辑节点都被扩展了(这样Flink就可以通过执行边将新创建的执行节点与上游执行节点连接起来)。图2-执行拓扑的动态构建3.3.2子分区动态映射在引入自适应批作业调度器之前,Flink在部署执行节点时,需要知道其下游逻辑节点的并行度。因为下游逻辑节点的并行度决定了上游执行节点需要产生的子分区的数量。以图3为例,下游B的并行度为2,所以上游A1/A2需要生成2个子分区,索引为0的子分区被B1消费,索引为1的子分区被消费由B2。图3-静态执行拓扑消费子分区方式但显然,这不适用于动态图,因为在上游执行节点部署时,下游逻辑节点的并行度可能还未确定(即当A1/A2是已部署,B的并行性尚未确定)。为了解决这个问题,我们需要将上游执行节点产生的子分区数量与下游逻辑节点的并行度解耦。我们通过以下方式实现解耦:将上游执行节点产生的子分区数设置为下游逻辑节点的最大并行度(最大并行度是一个可配置的固定值),然后在并行度之后下游逻辑节点确定后,将这些子分区平均分配给不同的下游执行节点进行消费。也就是说,在部署下游执行节点时,会将每个下游执行节点分配到一个子分区范围内进行消费。假设N是下游逻辑节点并行度,P是子分区数。对于下游第k个执行节点,消耗的子分区范围应该是:以图4为例,B的最大并行度为4,所以A1/A2有4个子分区。那么如果B的确定并行度为2,子分区图就是图4(a),如果B的确定并行度为3,子分区图就是图4(b)。图4-动态执行拓扑消耗子分区的方法3.4动态更新和调度执行拓扑。adaptivebatchjobscheduler的调度方式与默认调度器基本相同。唯一不同的是:自适应批处理作业的执行拓扑开始调度。在处理任何调度事件之前,它会尝试判断所有逻辑节点的并行度,然后尝试为逻辑节点生成相应的执行节点,并通过执行边连接上游节点,更新执??行拓扑。调度器在每次调度前会尝试根据拓扑顺序确定所有逻辑节点的并行度:对于源节点,在开始调度前确定其并行度;对于非源节点,需要在所有上游节点输出,完成后才能确定其并行度。然后,调度程序将尝试扩展逻辑节点以按拓扑顺序生成执行节点。一个可扩展的逻辑节点应该满足以下条件:逻辑节点的并行度已经确定;所有上游逻辑节点都已扩展。04未来展望-运行自动负载均衡的批处理作业时,可能会出现数据倾斜(某个执行节点需要处理的数据比其他执行节点多得多),这将导致作业出现长尾现象,导致完成速度变慢工作的速度。如果Flink能够自动改进或解决这个问题,对用户来说会有很大的帮助。数据倾斜的典型情况是某些子分区的数据明显多于其他子分区。这种情况可以通过划分更细粒度的子分区并根据子分区的大小来平衡工作负载来解决(图5)。自适应批处理调度器的工作可以被认为是迈向它的第一步,因为自动重新平衡的要求与自适应批处理调度器的要求类似,两者都需要动态图的支持和获取结果分区大小。基于自适应批处理作业调度器的实现,我们可以通过提高最大并行度(对于更细粒度的子分区)和简单地改变子分区范围划分算法(对于工作负载平衡)来解决上述问题。在目前的设计中,子分区的范围是根据子分区的数量来划分的。我们可以改成按照子分区的数据量来划分,这样每个子分区的数据量可以大致相同,从而平衡下游执行节点的工作量。图5-自动负载均衡注解[1]https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/elastic_scaling/#adaptive-batch-scheduler[2]https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java[3]https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java[4]https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java[5]https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java[6]https://nightlies.apache.org/flink/flink-docs-master/zh/docs/internals/job_scheduling/#jobmanager-数据结构[7]https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismDecider.java