介绍:在Flink1.12中,针对当前算子链无法覆盖的场景,引入了多输入算子和源链优化。本次优化将消除Flink作业中大部分冗余的shuffle,进一步提升作业执行效率。本文将以一个SQL作业为例介绍上述优化,并展示Flink1.12在TPC-DS测试集上的结果。执行效率的优化一直是Flink追求的目标。在大多数作业中,尤其是批处理作业,通过网络在任务之间传递数据(称为数据混洗)是非常昂贵的。一般情况下,一段数据需要经过网络的序列化、磁盘读写、socket读写、反序列化,才能从上游任务传输到下游;而在内存中传输相同的数据只需要消耗几个CPU周期性地传输一个八字节的指针。在Flink的早期版本中,已经使用算子链机制将相同并发的相邻单输入算子整合到同一个任务中,消除了单输入算子之间不必要的网络传输。但是,join等多输入算子之间也存在额外的数据shuffle问题,shuffle数据量最大的源节点与多输入算子之间的数据传输无法通过operatorchaining机制进行优化。在Flink1.12中,我们针对当前算子链无法覆盖的场景引入了多输入算子和源链优化。本次优化将消除Flink作业中大部分冗余的shuffle,进一步提升作业执行效率。本文将以一个SQL作业为例介绍上述优化,并展示Flink1.12在TPC-DS测试集上的结果。优化案例分析:订单量统计我们将以TPC-DSq96为例,详细介绍如何消除冗余shuffle。该SQL旨在通过multi-wayjoin对满足特定条件的订单进行过滤统计。从store_sales、household_demographics、time_dim、storewheress_sold_time_sk=time_dim.t_time_sk和ss_hdemo_sk=household_demographics.hd_demo_sk和ss_store_sk=s_store_sk和time_dim.t_hour=8和time_dim.t_minute>=30和household_demographics.count=5s_store_name='ese'图1-初始执行计划的冗余Shuffle是如何产生的?由于一些算子对输入数据的分布有要求(例如hashjoin算子要求同一个并发下datajoinkey的hash值相同),数据在算子之间传递时可能需要重新排列和排序。Flinkshuffle类似于map-reduceshuffle过程,将上游任务产生的中间结果进行整理,按需发送给需要这些中间结果的下游任务。但是在某些情况下,上游产生的数据已经满足了数据分布的要求(比如多个连续的joinkey具有相同的hashjoinoperator),此时数据排序就不再需要了,由此产生的shuffle就变成了冗余shuffle,在执行计划中用forwardshuffle表示。图1中的散列连接运算符是一种称为广播散列连接的特殊运算符。以store_salesjointime_dim为例。由于time_dim表中的数据量很小,此时将表中的全量数据通过broadcastshuffle发送给hashjoin的各个并发,这样任何一个并发都可以接受store_sales表中的任何数据不影响加入。结果的正确性,同时提高了hashjoin的执行效率。这时store_sales表到join算子的网络传输也变成了冗余的shuffle。同样,几个join之间的shuffle也是没有必要的。图2-Redundantshuffle(红框标注)除了hashjoin和broadcasthashjoin,还有很多冗余shuffle的场景,比如hashaggregate+hashjoingroupkey和joinkey相同,groupkey有包含关系,多个hash聚合等,这里不再赘述。OperatorChaining能解决吗?对Flink优化过程有一定了解的读者可能知道,为了消除不必要的forwardshuffle,Flink在前期引入了operatorchaining机制。该机制将并发相同的相邻单输入运算符集成到同一个任务中,并在同一个线程中一起运行。Operatorchaining机制在图1中已经生效,如果没有它,broadcastshuffle的三个Source节点名称中用“->”分隔的operators会被拆分成多个不同的task,造成冗余的datashuffle。图3显示了Operatorchaining关闭时的执行计划。图3-Operatorchaining关闭后的执行计划通过网络和文件减少TM之间的数据传输,将operator链接合并到任务中是一个非常有效的优化:它可以减少线程之间的切换,减少消息的序列化和处理反序列化减少了缓冲区中的数据交换并提高了整体吞吐量,同时减少了延迟。但是算子链对算子的集成度有非常严格的限制,其中之一就是“下游算子的入度为1”,也就是说下游算子只能有一个输入。这不包括具有多个输入的运算符,例如连接。多输入算子的解决方案:MultipleInputOperator如果能模仿算子链的优化思路,引入一种新的优化机制,并满足以下条件:该机制可以将多输入的算子结合起来;该机制支持多输入(为Combinedoperators提供输入)我们可以将forwardshuffle连接的多输入operator放在一个task中执行,从而消除不必要的shuffle。Flink社区长期以来一直关注OperatorChaining的缺点。在Flink1.11中,引入了流式API层的MultipleInputTransformation和对应的MultipleInputStreamTask。这些API满足了上述条件2,并在此基础上,Flink1.12在SQL层实现了满足条件1的新算子——多输入算子,请参考FLIP文档[1]。多输入运算符是表层的可插入优化。位于表层优化的最后一步,遍历生成的执行计划,将相邻未被交换阻塞的算子整合为一个多输入算子。图4显示了此优化如何修改原始SQL优化步骤。图4-添加多个输入运算符后的优化步骤读者可能会有疑问:为什么不修改现有的运算符链,而是从头开始?实际上,多输入算子除了要完成算子链接的工作外,还需要对每个输入的优先级进行排序。这是因为一些多输入算子(如hashjoin、nestedloopjoin)对输入有严格的顺序限制,如果输入的优先级排序不当,很可能造成死锁。由于算子输入的优先级信息只在表层的算子中描述,所以在表层引入这种优化机制就更自然了。值得注意的是,多输入运算符不同于管理多个运算符的运算符链接。它本身就是一个大型运营商,其内部运营对外是一个黑匣子。多输入运算符的内部结构充分体现在运算符名称中。读者在运行包含该算子的作业时,可以从算子名称中看出哪些算子在什么拓扑下组合成多输入算子。图5给出了多输入优化后算子的拓扑结构和多输入算子的视角。去掉图中三个hashjoinoperator之间的冗余shuffle后,它们可以在一个task中执行,但是operatorchaining无法处理这种多输入的情况,所以将它们放在multipleinputoperator中执行,输入顺序为每个算子和算子之间的调用关系由多输入算子管理。图5-多输入优化后的算子拓扑图多输入算子的构建和运行过程比较复杂。对细节感兴趣的读者可以参考设计文档[2]。Source也不能错过:SourceChaining通过多输入operator优化,我们优化了图1到图6的执行计划,图3通过operatorchaining优化后变成了图6的执行图。图6-多输入算子优化后的执行计划图6中的store_sales表生成的正向洗牌(红色框所示)表明我们仍有优化空间。前言中提到,在大多数作业中,直接从source产生的数据并没有经过join等算子的筛选和处理,所以shuffle的数据量是最大的。以10T数据的TPC-DSq96为例,如果不做进一步优化,包含store_sales源表的任务会向网络传输1.03T的数据,经过joinfilter后,数据量迅速下降到16.5G。如果我们可以省略源表的前向shuffle,则可以大大提高作业的整体执行效率。不幸的是,多输入算子无法涵盖源洗牌场景,因为源不同于任何其他算子,它没有任何输入。为此,Flink1.12在operatorchaining中加入了sourcechaining功能,将没有被shuffle阻塞的source合并到operatorchaining中,省去了source和下游operator之间的前向shuffle。目前只有FLIP-27source和multipleinputoperator可以利用sourcechaining功能,但这足以解决本文中的优化场景。结合多输入算子和源链接后,图7显示了本文优化案例的最终执行计划。图7-优化执行计划的TPC-DS测试结果Multipleinputoperator和sourcechaining对大多数作业,尤其是批处理作业都有显着的优化效果。我们使用TPC-DS测试集来测试Flink1.12的整体性能。与Flink1.10公布的12267s的总耗时相比,Flink1.12的总耗时仅为8708s,缩短了近30%的运行时间!Figure8-TPC-DStestsettotaltimecomparison图9-TPC-DS部分测试点时间对比Futureplans通过TPC-DS的测试结果,我们可以看到sourcechaining+multipleinput可以给我们带来很大的性能提升。目前整体框架已经完成,通用的批量算子已经支持消除冗余交换的推导逻辑。未来我们会支持更多的批量算子和更精细的推导算法。虽然streamjobs的datashuffle不需要像batchjobs一样把数据写到磁盘,但是把网络传输改成内存传输带来的性能提升也是非常可观的,所以streamjobs支持sourcechaining+multipleinput也很可观前进到优化。同时,在流式作业上支持这种优化还需要做大量的工作。例如流式算子上消除冗余交换的推导逻辑,还有一些算子需要重构来消除输入数据为二进制的要求等等,这也是为什么Flink1.12还没有推出的原因流作业中的这种优化尚未。我们会在后续版本中逐步完成这些工作,也希望有更多的社区力量加入我们,尽快实现更多的优化。作者:何晓玲、翁才智原文链接本文为阿里云原创内容,未经允许不得转载
