当前位置: 首页 > 后端技术 > Node.js

快手基于Flink的持续优化与实践

时间:2023-04-03 18:03:08 Node.js

介绍:快手基于Flink的持续优化与实践介绍。1.Flink稳定性的持续优化第一部分是Flink稳定性的持续优化。这部分包括两个方面。第一个方面主要介绍了FlinkKafkaConnector中的快手一些基于内部双机房读或者双机房写的高可用以及一些容错策略。第二部分是关于Flink任务的故障恢复。我们做了一些优化工作来加速故障恢复。首先介绍一下Source的高可用。公司内部重要数据写入Kafka时,Kafka层面一般会创建一个双集群topic来保证高可用。双集群的主题共享所有流量。如果单个集群出现故障,上游会自动分流。这样Kafka层面就实现了双集群的高可用。但是,Flink任务在消费双集群topic时,无法做到高可用。Flink任务是通过联合两个源来消费的。源知道上游主题故障,单个集群故障需要手动删除源。这种方式的缺点是出现故障时需要人工干预,需要人工修改代码的逻辑。程序本身不能高可用。这是做双机房阅读的背景。为了解决以上问题,我们封装了一个KafkaClusterSource,在API上支持双集群阅读topic。同时可以容忍单个集群故障,当集群故障恢复时自动重新加入故障集群。接下来是Sink的高可用。Flink在写双集群的KafkaTopic时,会定义不同的集群Sink,在逻辑内部控制流的拆分。这种方式灵活性差,不能容忍单个机房出现故障。如果单个集群出现故障,仍然需要手动移除对应的sink。同样的,我们也为sink定制了一个ClusterSink,支持在API上编写双集群topic。具体的写策略可以支持轮询和主从写。如果单个集群出现故障,逻辑会自动将流量切换到正常的集群主题。如果单个集群恢复失败,它还可以感知集群的恢复情况,自动恢复对应的集群。另外,基于Kafka的connector,我们也实现了一些容错策略。这里有三点。第一点是KafkaSink容忍丢失。这个问题的背景是如果kafka服务异常导致任务失败,业务可以容忍少量数据丢失,但是任务不希望挂掉。对于这个问题,我们的优化是设置KafkaSink在M时间内容忍X%的丢失。具体实现上,Sink会统计单个任务的失败频率,只有失败频率超过阈值时,任务才会失败。第二点是KafkaSource一键丢掉lag。这个问题产生的背景是,一旦任务滞后时间长,没有及时发现,或者任务调试环节,需要丢失历史校验。以前,延迟只能通过重新启动任务来消除。任务重启代码比较好,耗时较长。我们优化之后,可以热更新,不需要重启任务就可以丢弃卡顿。实现逻辑是动态向source发送一个操作命令,source收到命令后寻找到最新的位置。第三点是Kafkabroker列表的动态获取。这个问题产生的背景是生产环境的kafkabroker机器可能出现故障下线。一旦向离线机器发起请求,就会出现元数据获取超时,任务会频繁失败。我们优化后启动Source任务,可以获取集群信息,可以动态重新获取Kafkabrokerlist,避免频繁重启。第二部分是Flink任务的故障恢复优化,分为两个过程。一是故障发现,二是故障恢复。在实际生产环境中,一些不稳定的因素会导致恢复时间特别长,用户的感知会比较差。同时,里面还有一些质量比较高的任务,对稳定性的要求比较高。我们希望做一些事情来尽可能缩短整个恢复时间。我们设定了一个优化目标,实现20秒内自动恢复。故障发现阶段的优化包括三点:一是内部自研Hawk系统,5s发现宕机。其次,Yarn集成了Hawk以快速检测停机时间。第三,Flink感知宕机容器释放。故障恢复阶段的优化包括:首先,允许Container的部分冗余。第二,适当调整取消任务的超时时间。第三,为合适的任务开启RegionFailover。2.Flink任务启动优化第二部分是任务启动优化。当一个Flink任务启动时,通常会涉及到更多的角色和多个实例。如下图所示,其启动包括初始化客户端、构建jobGraph、上传Flinklib和jobjar、在客户端申请AM。在JobMaster之后,AM启动、初始化、构建ExectutionGraph、申请并启动Container、调度JobTasks。在TaskManager这边,申请到容器后,开始下载jar包资源,然后初始化TaskManager服务,然后在收到任务后进行部署。我们发现在线上启动任务时,基本都是分钟级别,耗时较长。如果有些任务需要升级,比如改变一些简单的逻辑,需要停止原来的任务,然后需要重新启动一个新的任务,这种场景可能会比较慢。所以我在任务启动的时候做了一些优化,尽可能的缩短任务启动的时间,进一步缩短业务中断的时间。在Flink新任务启动优化方面,我们发现IO交互比较耗时。客户端的IO包括将Flink引擎lib包上传到HDFS,用户将jar包上传到HDFS。JobMaster中包含Container下载启动资源,TaskManagerconf上传HDFS。包含在TaskManager中,Container下载启动资源,Conf文件下载。所以,我想尽量减少这样的10次操作。Flinkenginelib包,设置Public权限,在App之间共享。对于用户jar包,提供工具提前发布到集群机器。对于conf文件,通过环境变量传递。针对JobMaster启动TM频繁的文件判断,增加cache缓存。以上是新任务启动场景,下面介绍任务升级场景。以前是同步升级。比如任务A正在运行,然后我想停止任务A,开始新的任务B,如下图,不可用时间包括停止任务A,开始新的任务B,是否可以开始任务B不等待任务A完全停止?针对这个想法,我们做了一个异步升级的策略。新任务提早启动,初始化到JobMaster阶段。旧任务停止后,完成新任务的后续工作,实现新旧任务的无缝切换。通过内部提交平台将步骤串联起来,目标是在20s内完成异步升级。3.FlinkSQL实践与优化第三部分将介绍我们在FlinkSQL中使用的一些实践与优化。首先介绍一下快手中FlinkSQL的现状。目前,我们内部的FlinkSQL任务约占30%。FlinkSQL的任务数超过了360。那么它处理的峰值条目数还是比较高的,大约每秒4亿条。在内部一些重要活动的实时大屏场景中,目前也使用FlinkSQL作为链接参与相关指标的计算。下面介绍一下我们在使用FlinkSQL时遇到的一些问题,以及我们所做的一些优化。首先,关于FlinkSQL的倾斜问题,在UnBoundedAgg场景下已经有比较全面的解决倾斜问题的思路,可以归纳为三点。首先是MiniBatchAggregation,思路是将batch数据缓存在内存中,然后进行聚合,减少状态访问次数。二、LocalGlobalAggregation,思路是将聚合操作拆分成两个阶段,local阶段预聚合减少数据条数,global解决全局聚合。三、SplitDistinctAggregation,思路是针对countdistinct场景,先对groupingkey进行bucketing和pre-aggregation,然后全局聚合bucketing结果。所以我们解决的第一个问题就是BoundedAgg的倾斜问题。如下图,以左边的SQL为例,按用户分组,假设一天的窗口,然后选择每个用户的交易总金额。右图中假设有些用户的交易量很大,会导致某些WindowAgg的数据量特别大。解决方案分为两点。一、两阶段聚合,分为LocalwindowAgg和GlobalwindowAgg。LocalwindowAgg:预聚合窗口大小与全局stage一致。结果在检查点期间写出,并且不保存状态。全局窗口Agg:全聚合。第二,添加小批量。优点是localstage的mini-batch避免了过多的数据缓存,globalstage的mini-batch减少了state的访问次数。我们解决的第二个问题是FlinkSQL下UDF函数的复用。如下图,以左边的SQL为例,可以看到有两个UDF函数,并且这两个函数在SQL中重复出现。优化前:同一个UDF被多次执行,性能下降。优化后:UDF结果在同一条数据下复用,避免多次调用执行,节省资源,提升性能。比如SQL,性能提升2倍。4.未来的工作第四部分介绍了我们未来的一些计划,分为三个部分。首先,关于资源利用。目标是提高集群整体的资源利用平衡,Flink任务内部的调度平衡,以及Flink任务资源使用的合理性。二、关于FlinkSQL。我们会继续做推广。我们希望提高SQL任务的稳定性和SQL任务资源的利用率。第三,探索流批统一,这也是行业的一个方向。我们希望我们可以用一套代码解决问题,而不是重复开发两套任务。作者:阿里云实时计算Flink原文链接本文为阿里云原创内容,未经允许不得转载