当前位置: 首页 > 科技观察

京东Flink在K8s上的持续优化实践

时间:2023-03-13 06:16:21 科技观察

1.基本介绍K8s是业界非常流行的容器编排和管理平台。它可以非常简单高效地管理云平台中多台主机上的容器化应用。2017年前后,我们的实时计算出现了多种引擎并存的情况,包括Storm、SparkStreaming,以及引入的新一代计算引擎Flink。Storm集群运行在物理机上,SparkStreaming运行在YARN上。不同的运行环境导致部署和运行成本特别高,资源利用浪费,因此迫切需要一个统一的集群资源管理和调度系统来解决这个问题。而K8s可以很好的解决这些问题:可以轻松管理上千个容器化应用,易于部署和维护;易于实现混合部署,将在线服务、机器学习、流式计算、批计算等不同负载的服务混合在一起,获得更好的资源利用率;此外,它还具有天然的容器隔离和原生的弹性自愈能力,可以提供更好的隔离和安全性。经过一系列的尝试、优化和性能对比,我们选择了K8s。2018年初,实时计算平台开始全面容器化;到2018年6月,20%的任务已经在K8s上运行。从运行结果来看,无论是资源共享能力、业务处理能力、敏捷性和效率都有了很大的提升,初步达到了预期的效果;到2019年2月,所有实时计算都已容器化;此后,我们一直在K8s环境下进行优化和实践,比如弹性伸缩、服务混合部门、任务快速恢复能力建设等方面的实践。毕竟在K8s上,好处还是比较明显的:首先,混合部署服务和资源共享能力得到了提升,节省了30%的机器资源;资源的弹性伸缩保证了业务的稳定性;最后,一致的开发、测试和生产环境,避免了环境对整个开发过程造成的问题。的代价。京东Flink在K8s上的平台架构如上图所示。最下面是物理机和云主机,最上面是K8s。它使用京东开发的JDOS平台。在标准的K8s的基础上,做了很多的定制和优化,让它更适合我们的生产环境。实际情况。JDOS大部分运行在物理机上,小部分运行在云主机上。再往上是基于社区版Flink深度定制的Flink引擎。最上面是京东的实时计算平台JRC,支持SQL作业和jar包作业,提供高吞吐、低延迟、高可用、弹性自愈、易用的一站式海量流批量数据计算能力,并支持丰富的数据源和目标源,具有完善的作业管理、配置、部署、日志监控、自运维功能,并提供备份回滚和一键迁移功能。我们的实时计算平台服务于京东内部的许多业务线。主要应用场景包括实时数仓、实时大屏、实时推荐、实时报表、实时风控、实时监控等应用场景。目前我们的实时K8s集群由7000多台机器组成,在线Flink任务5000多个,数据处理峰值可达每秒10亿以上。2.生产实践最初的容器化方案采用基于K8sdeployment部署的standalonesessioncluster,是一种静态分配资源的模式。如上图所示,用户在创建时需要确定所需的管理节点JobManager的个数和规格(包括CPU核数、内存和磁盘大小等),运行节点TaskManager的个数和规格(包括CPU、内存和磁盘大小等),以及Taskmanager中包含的槽数。集群创建完成后,JRC平台通过K8s客户端向K8smaster发送请求创建Jobmanager的deployment。这里使用ZK来保证高可用,HDFS和OSS用于状态存储。集群创建完成后,即可提交任务。但是,在我们实践的过程中,我们发现这个方案存在一些不足之处。需要业务提前预估所需资源,对业务不是很友好,不能满足灵活多变的业务场景。例如,对于一些复杂的拓扑或集群运行多个任务的场景,业务很难提前准确确定所需的资源。在这种情况下,通常会先创建一个更大的集群,这会造成一定的资源浪费。在任务运行过程中,无法根据任务的运行状态动态伸缩资源。所以我们升级了容器化方案来支持弹性资源模型。这是一种按需分配资源的方法。如上图,需要用户在创建时指定需要的管理节点Jobmanager的个数和规格以及运行节点Taskmanager的规格,可以不指定Taskmanager的个数。点击CreateCluster后,JRC平台会通过K8s客户端向K8smaster发送请求创建Jobmanagerdeployment,并可选择预先创建指定数量的Taskmanagerpod。平台提交任务后,JobMaster通过JDResourceManager向JRC平台发送rest请求申请资源,平台再动态向K8smaster申请资源,创建运行Taskmanager的pod。配置资源的动态释放。这里通过平台和K8s的交互来创建和销毁资源,主要是为了保证计算平台对资源的管理和控制,同时避免集群配置和逻辑变化对镜像的影响;通过支持用户配置TaskManager的数量来预分配资源,可以达到和静态分配资源一样快的任务提交速度;同时,通过自定义资源分配策略,实现兼容原有slot分散分布的均衡调度。FlinkonK8s环境中,日志和监控指标非常重要。它们可以帮助我们观察整个集群、容器和任务的运行状态,并根据日志和监控快速定位问题并及时处理。这里的监控指标包括物理机指标(如CPU、内存、负载、网络、连通性、磁盘等指标)、容器指标(如CPU、内存、网络等指标)、JVM指标和Flink指标(集群指标)和任务指标)。其中物理机指标和容器指标通过metricagent采集上报给Origin系统,JVM指标和Flink指标通过Jobmanager和Taskmanager中自定义的metricreporter上报给Baize系统,然后统一到监控、查看和报警的计算平台。日志采集采用京东的Logbook服务。它的基本机制是在每个Node上运行一个日志代理,收集指定路径的日志;然后Jobmanager或者Taskmanager会按照指定的规则将日志输出到指定的目录,然后日志会被自动收集到Logbook系统中;最后,可以通过计算平台检索和查询实时日志和历史日志。接下来是容器网络的性能问题。一般来说,虚拟化会带来一定的性能损失。容器网络作为容器虚拟化的重要组成部分,与物理机网络相比,不可避免地会遭受一些性能损失。性能下降的程度取决于网络插件、协议类型和数据包大小。如上图所示,是针对跨主机容器网络通信的性能评估。参考基线是服务端和客户端在同一台主机上通信。从图中可以看出,host模式实现了接近参考基线的吞吐量和延迟,而NAT和Calico有较大的性能损失,这是地址转换和网络包路由的开销造成的;而所有的覆盖网络都有非常大的性能损失。一般来说,网络数据包的封装和解封装比地址转换和路由的开销更大,所以使用哪个网络需要权衡。比如overlay网络,由于网络包的封装和解封装,会造成很大的开销,性能会比较差,但它允许更灵活和安全的网络管理;NAT和主机模式网络更容易获得良好的性能,但安全性较差;路由网络性能也不错但需要额外支持。此外,网络丢失对检查点的速度也有很大的影响。根据我们的对比测试,在网络模式不同的情况下,在相同的环境下运行相同的任务,容器网络任务的检查点时间是宿主网络的两倍以上。那么如何解决这个容器网络的性能问题呢?一是根据机房的环境选择合适的网络方式:比如我们一些老机房,容器网络的性能下降特别严重,网络架构无法升级。使用host网络(如上图所示,在podyaml文件中配置hostNetwork=true)避免丢包的问题。虽然这不符合K8s的风格,但是需要根据情况做一个trade-off;对于新建机房,由于基础网络的性能提升,采用了全新的高性能网络插件,与宿主机网络相比,性能损失非常小,因此采用了容器网络;二是尽量避免使用异构网络环境,避免K8s跨机房,同时适当调整集群网络的相关参数,增加网络的容错能力。比如akka.ask.timeout和taskmanager.network.request-backoff.max这两个参数可以适当调整。再说说磁盘的性能。容器中的存储空间由两部分组成。如上图所示,底层是只读镜像层,顶层是读写容器层。容器运行时,所有涉及文件的写操作都在容器层完成。这里需要存储驱动提供联合文件系统进行管理。存储驱动一般针对空间效率进行了优化,额外的抽象会带来一些性能损失(取决于具体的存储驱动),写入速度会比本地文件系统慢,尤其是在使用copy-on-write存储驱动时,损失更大。这对写密集型应用程序有更大的性能影响。在Flink中,很多地方都涉及到本地磁盘的读写,比如日志输出、RocksDB读写、批任务shuffle等,那么如何处理才能减少影响呢?第一,可以考虑使用外部卷,使用本地存储卷,直接将数据写入宿主文件系统,提高性能;此外,还可以调优磁盘IO相关参数,比如调优RocksDB参数,提高磁盘访问性能;最后,也可以考虑使用一些存储和计算分离的方案,比如使用remoteshuffle,来提高localshuffle的性能和稳定性。在实践中,经常会发现很多业务的计算任务配置不合理,占用资源过多,造成资源浪费。此外,流量也有高峰和低谷。如何在洪峰自动扩容,在洪低谷自动缩容,减少人工干预,保证业务稳定的同时提高资源利用率,都涉及到资源弹性伸缩的问题。为此,我们开发了弹性伸缩服务,根据作业的运行动态调整任务的并行度和TaskManager的规格,解决作业吞吐量不足、资源浪费等问题。如上图所示,大致流程如下:首先,在JRC平台上进行任务的伸缩配置,主要包括运行度调整的上下限和一些伸缩策略的阈值。这些配置将被发送到缩放服务;在伸缩服务运行过程中,实时监控集群和任务的运行指标(主要是部分CPU的使用率和算子的繁忙程度等),结合伸缩配置和调整策略生成任务调整结果,并发送至JRC平台;随任务调整。目前,伸缩服务可以较好地解决部分场景下的资源浪费问题,以及在任务吞吐量和算子并行度呈线性关系的情况下的性能问题。但是它仍然存在一定的局限性,比如外部系统瓶颈、数据倾斜、任务本身的性能瓶颈,以及无法通过扩展并行度来改善的场景,所以不能很好地应对。另外,结合弹性伸缩,我们也尝试了一些实时流式任务和离线批处理任务。如上图右侧所示,凌晨前后,流式任务相对空闲,通过缩容的方式释放部分资源给批式任务;这些释放出来的资源可以用来晚上跑批任务;白天运行批处理任务后,释放的资源可以再次使用。返回到流式任务进行扩容以应对流量高峰,从而提高资源的整体利用率。相比物理机或者YARN环境,FlinkonK8s出现问题后排查难度相对更大,因为涉及到K8s的很多组件,比如容器网络、DNS解析、K8s调度等门槛。为了解决这个问题,我们开发了智能诊断服务,将与作业相关的各个维度的监控指标(包括物理机、容器、集群和任务指标)与任务拓扑结合,并与K8s打通,结合与pod日志和任务日志联合分析,总结日常人工运维的一些方法,应用于分析策略,诊断作业问题,给出优化建议。目前支持任务重启、任务反压、检查点失效、集群资源利用率低等一些常见问题的诊断,未来会不断丰富和完善。3.优化改进在实践过程中,采用静态资源分配方式时,一般会根据TaskManager分散槽位,根据TaskManager分散消耗资源的算子,实现作业的均衡调度,提高工作表现。如右上图,有2个Taskmanager,每个Taskmanager有4个slot,1个job有2个operator(分别用绿色和红色表示),每个operator有2个并行度。在使用默认调度策略(sequentialscheduling)的情况下,这个job的所有operator都会集中在一个Taskmanager中;而如果采用平衡调度,这个作业的所有算子都会按照TaskManager横向分散,每个TaskManager会以1个并行度分布给两个算子(绿色和红色)。但是当采用资源动态分配方式(原生K8s)时,资源是为每个pod单独申请创建的,那么此时如何实现均衡调度呢?我们通过在任务调度之前预先分配资源来解决这个问题。具体过程如下:用户提交job后,如果开启了资源预分配,JobMaster不会立即调度task,而是一次性向ResourceManager预申请job需要的资源.所需资源到位后,将通知JobMaster。此时,调度任务可以实现与静态资源分配方式相同的均衡调度。在这里你还可以为JobMaster配置一个超时时间。超时后会按照正常的任务调度流程进行,不会无限期地等待资源。我们进行了真实场景的性能对比。如上图右侧所示,使用顺序调度时作业吞吐量为5700万/分钟,启用资源预分配和均衡调度后作业吞吐量为8947万/分钟。增加了57%,还是比较明显的效果。我们平台上很多业务都是用一个集群来运行多个任务,所以会有一个TaskManager来分发不同作业的任务,导致不同作业之间相互影响。那么如何解决这个问题呢?我们定制了插槽分配策略。当JobManager向ResourceManager申请一个slot时,如果开启了任务资源隔离,SlotManager会将分配了slot的Taskmanager标记为job,那么Taskmanager空闲的slot只能用于job的slot.问。通过将TaskManager分组为作业,实现集群多任务的资源隔离。如上图右侧所示,一个Taskmanager提供了3个slot,3个job,每个job有一个operator,并行度为3(分别用绿色、蓝色和红色表示)。打开插槽平铺和分散。在隔离之前,三个作业会共享三个Taskmanager,每个Taskmanager为每个作业分配一定的并行度。开启任务资源隔离后,各工作部门独享一个TaskManager,互不影响。容器环境复杂多变,pod可能被驱逐或重启:例如机器硬件故障、docker故障、节点负载过高等都会导致pod被驱逐;进程不健康、进程异常退出、docker异常重启也会导致pod重启。此时任务会重启恢复,对业务造成影响。那么,如何才能减少对您的业务的影响呢?一方面是加快对容器环境的pod异常(被驱逐或重启)的感知,并快速恢复运行。在官方默认的实现中,如果一个pod异常,可能会从两个途径感知到:一是故障pod的下游operator可能感知到网络连接断开,从而触发异常触发failover;另一个是JobManager会先感知TaskManager心跳超时,此时也会触发Failover。无论通过哪条路径,所需要的时间都会比超时时间长。在我们默认的系统配置下,需要的时间是60多秒。这里我们优化了pod异常感知的速度。当Pod异常停止时,默认会有30秒的优雅停止时间。此时容器主进程启动脚本会收到来自K8s的TERM信号。除了做必要的清理动作外,我们还添加了一个通知Jobmanager异常的Taskmanager的链接;当容器中工作进程的taskmanager异常退出时,主进程(这里是启动脚本)也会感知到,也会通知jobmanager是哪个taskmanager异常。这样在pod异常的时候可以第一时间通知Jobmanager,及时恢复job。通过本次优化,在典型测试场景下,当集群有空闲资源时,任务故障切换时间从原来的60秒缩短到几秒;视频时长也缩短了30多秒,效果还是比较明显的。另一个方面是减少pod异常对作业的影响。虽然社区版本在1.9之后提供了基于region的部分恢复策略,但是当任务失败时,只重启与失败任务关联的region中的任务,在某些场景下可以降低影响。但是很多时候一个job的算子是通过rebalance或者hash全连接的,region策略并没有起到很大的作用。为此,我们在1.10和1.12版本开发了基于Task的故障单点故障恢复策略。当Task失败时,只恢复故障Task,不影响非故障Task。如上图所示,这个作业有source、map、sink三个算子。其中source和map都是1个并行度,sink是2个并行度。map的一级并行度map(1/1)和sink的二级并行度sink(2/2)分布在pod_B上。当pod_B被evicted时,Jobmanager会检测到pod_B异常,然后会在new的pod_D上重新部署这两个Task,记录为map(1/1)'和sink(2/2)';部署完成后,会通知故障的Taskmap(1/1)的下游sink(1/1)新的上游Taskmap(1/1)'准备好了,然后sink(1/1)会与upstreammap(1/1)'重新建立连接进行通信。具体实现中需要注意以下几点:首先,故障恢复前,故障Task的上游如何处理待发送数据,下游如何处理接收到的残差数据?这里我们会直接丢弃上游输出到故障任务的数据,如果下游收集到不完整的数据,也会被丢弃;第二,当上下游无法感知对方异常时,恢复后如何处理?这里可能需要一个强制更新过程;三是多个任务分布在一个pod上的情况。如果pod异常,有多个故障任务,如果这些故障任务之间存在依赖关系,如何正确处理?这里需要根据依赖依次部署。通过单点恢复策略,线上应用取得了很好的效果,对运行的影响范围大大缩小(根据具体操作,可以降低到原来的零点几到百分之几),避免业务中断。同时,恢复时间也大大缩短(从典型场景的一分多钟缩短到几秒——几十秒)。当然,这种策略也是有代价的。恢复时会带来少量数据丢失,适用于流量服务等对少量数据丢失不敏感的业务场景。4.未来规划未来我们将在以下几个方面继续探索:一是K8s层面的资源调度优化,更高效地管理大数据的在线服务和离线作业,提高K8s集群的利用率和运行效率;二是对Flink作业调度进行了优化,支持更丰富、更细粒度的调度策略,提高Flink作业资源的利用率和稳定性,满足不同业务场景的需求。一是调度优化:二是服务混合:将不同负载的服务混合在一起,在保证服务稳定性的同时,实现资源利用率最大化,实现服务器价值最大化;智能运维:支持任务进行智能诊断,自适应调整运行参数,实现作业合格,降低用户调优和平台运维成本;最后,FlinkAI支持:在人工智能应用场景中,Flink包括特征工程、在线学习、资源预测等方面都有一些独特的优势,未来我们也会从平台层面在这些场景进行探索和实践。