当前位置: 首页 > 科技观察

阿里云EMRRemoteShuffleService在小米的实践

时间:2023-03-21 00:11:13 科技观察

阿里云EMR自2020年推出RemoteShuffleService(RSS)以来,帮助众多客户解决了Spark作业的性能和稳定性问题,实现了存储计算分离架构同时,RSS也在与合作伙伴小米的共建中不断进化。本文将介绍RSS的最新架构、在小米的实践以及开源。1问题回顾Shuffle是大数据计算中最重要的算子。首先,覆盖率高,超过50%的作业包含至少一个Shuffle[2]。其次,资源消耗大。阿里巴巴内部平台Shuffle占用CPU超过20%,LinkedIn内部ShuffleRead造成资源浪费高达15%[1],单次Shuffle数据量超过100T[2]。三、不稳定,硬件资源的稳定性CPU>内存>磁盘≈网络,与Shuffle的资源消耗顺序相反。OutOfMemory和FetchFailure可能是Spark作业中最常见的两个错误。前者可以通过调参来解决,后者则需要对Shuffle进行系统重构。传统的Shuffle如下图所示。Mapper根据PartitionId对Shuffle数据进行排序并写入磁盘,然后交给ExternalShuffleService(ESS)进行管理。Reducer从每个MapperOutput中读取自己的Block。传统的Shuffle存在以下问题。本地磁盘依赖限制了存储和计算的分离。存储计算分离是近几年兴起的一种新架构。将计算和存储解耦,模型设计更灵活:计算节点强CPU弱磁盘,存储节点强磁盘强网络弱CPU。计算节点是无状态的,可以根据负载进行弹性扩展。在存储端,随着对象存储(OSS、S3)+数据湖格式(Delta、Iceberg、Hudi)+本地/近地缓存等解决方案的成熟,可以作为无限容量的存储服务。用户通过计算弹性+存储即付即用的方式获得成本节约。但是,Shuffle对本地磁盘的依赖限制了存储计算分离。写放大倍率。当MapperOutput数据量超过内存时,会触发efflux,从而引入额外的磁盘IO。大量随机读取。MapperOutput属于某个Reducer的数据量较小,比如Output128M,Reducer并发2000,那么每个Reducer只能读取64K,导致大量的小粒度随机读取。对于HDD,随机读取性能极差;对于SSD来说,SSD的寿命会很快消耗掉。网络连接数过高导致线程池CPU消耗过多,导致性能和稳定性问题。Shuffle数据单副本,大规模集群场景下坏盘/节点很常见。Shuffle数据丢失导致的Stage重新计算带来性能和稳定性问题。2.RSS发展历程针对Shuffle问题,业界尝试了各种方法,近两年逐渐趋同于PushShuffle方案。1SailfishSailfish[3](2012)首先提出了PushShuffle+Partition数据聚合方式,可以将大型作业的性能提升20%-5倍。Sailfish神奇的改变了分布式文件系统KFS[4],不支持多副本。2DataflowGoolgeBigQueryandCloudDataflow[5](2018)实现了Shuffle和计算的解耦,采用多层存储(内存+磁盘),但没有透露更多的技术细节。3RiffleFacebookRiffle[2](2018)在Mapper端采用了Merge的方式。部署在物理节点上的Riffle服务负责根据PartitionId合并本节点上的Shuffle数据,从而在一定程度上将小粒度的随机读合并成更小的大粒度。4CoscoFacebookCosco[6][7](2019)采用了Sailfish的方法并重新设计,保留了PushShuffle+Parititon数据聚合的核心方法,但使用了独立的服务。服务器采用Master-Worker架构,使用内存两副本,使用DFS进行持久化。Cosco基本定义了RSS的标准架构,但是由于DFS的拖累,性能并没有明显提升。5ZeusUberZeus[8][9](2020)也是采用去中心化的服务架构,但是没有类似etcd的角色来维护Worker的状态,所以很难做状态管理。Zeus通过客户端双推的方式制作多份副本,使用本地存储。6RPMPIntelRPMP[10](2020)依赖RDMA和PMEM新硬件加速Shuffle,不做数据聚合。7MagnetLinkedInMagnet[1](2021)集成了localShuffle+PushShuffle,其设计理念是“besteffort”。Mapper的Output写到本地后,Push线程会将数据推送到远端ESS进行聚合,并不能保证所有数据都会聚合。得益于本地Shuffle,Magnet在容错和AE支持方面表现更好(直接Fallback到传统Shuffle)。Magnet的局限性包括依赖本地磁盘,不支持存储计算分离;数据合并依赖ESS,这给NodeManager带来了额外的压力;ShuffleWrite既写本地又写远程,它的性能不是最优的。Magnet方案已经被ApacheSpark接受,成为默认的开源方案。8FireStormFireStorm[11](2021)结合了Cosco和Zeus的设计。服务器端采用Master-Worker架构,通过Client的多次写入实现多副本。FireStorm采用本地磁盘+对象存储的多层存储,采用更大的PushBlock(默认3M)。FireStorm将PushBlock的元信息保存在存储端,记录在索引文件中。FireStorm的Client缓存数据的内存由SparkMemoryManager管理,通过细粒度的内存分配(默认3K)尽可能避免内存浪费。从上面的描述可以看出,目前的方案基本都趋同于PushShuffle,但是一些关键设计的选择因公司而异,主要体现在:是集成到Spark中还是独立服务。RSS服务端架构,选项包括:Master-Worker、去中心化与轻量级状态管理、完全去中心化。Shuffle数据存储选项包括:内存、本地磁盘、DFS、对象存储。对于多副本的实现,方案包括:Client多推送,Server做Replication。阿里云RSS[12][13]于2020年上线,核心设计参考了Sailfish和Cosco,在架构和实现层面做了改进,下面会详细介绍。3阿里云RSS核心架构基于上一节的关键设计,阿里云RSS的选型如下:独立服务。考虑到RSS集成到Spark中无法满足存储计算分离架构,阿里云RSS将Shuffle服务作为独立服务提供。主从架构。通过Master节点管理服务状态是非常必要的,基于etcd的状态管理能力有限。多种存储方式。目前支持本地磁盘/DFS等存储方式,主要以本地磁盘为主,未来将向分层存储方向发展。服务器进行复制。推送更多的客户端会额外消耗计算节点的网络和计算资源,对于独立部署或面向服务场景下的计算集群并不友好。下图展示了阿里云RSS的关键架构,包括Client(RSSClient、MetaService)、Master(ResourceManager)和Worker三个角色。Shuffle流程如下:Mapper在第一次推送Data时请求Master分配Worker资源,Worker记录自己需要服务的Partition列表。Mapper在内存中缓存Shuffle数据,超过阈值时触发Push。属于同一个Partition的数据被推送到同一个Worker进行合并。主Worker内存接收到数据后,立即向从Worker发起复制。数据到达内存两份后,向Client发送ACK,Flusher后台线程负责刷盘。MapperStage运行后,MetaService向Worker发起CommitFiles命令,将内存中剩余的数据全部flush,返回文件列表。Reducer从对应的文件列表中读取Shuffle数据。阿里云RSS的核心架构和容错的详细介绍参见[13]。接下来,本文将介绍阿里云RSS在过去一年的架构演进以及区别于其他系统的特点。1状态下沉RSS采用Master-Worker架构。在最初的设计中,Master负责集群状态管理和Shuffle生命周期管理。集群状态包括Worker的健康和负载;生命周期包括每个Shuffle有哪些Worker服务,每个Worker服务的Partition列表,Shuffle的状态(ShuffleWrite,CommitFile,ShuffleRead),是否有数据丢失等。维护Shuffle生命周期需要大量的数据和复杂的数据结构,阻碍了MasterHA的实现。同时,大量的生命周期管理服务调用使得Master容易成为性能瓶颈,限制了RSS的可扩展性。为了减轻Master的压力,我们将生命周期状态管理下沉到Driver,Application自己管理Shuffle。Master只需要维护RSS集群自身的状态即可。本次优化大大降低了Master的负载,使MasterHA的顺利实施。2AdaptivePusher在最初的设计中,阿里云RSS和其他系统一样采用Hash-BasedPusher,即Client为每个Partition维护一个(或多个[11])内存缓冲区,当缓冲区超过阈值时触发推送。这种设计在中等并发的情况下没有问题,但是在大并发的情况下就会造成OOM。比如Reducer的并发为5W,那么在Buffer[13](64K)小的系统中,极限内存消耗是64K*5W=3G,在Buffer[11](3M)大的系统中,极限内存消耗为3M*5W=146G。这是无法接受的。为了解决这个问题,我们开发了Sort-BasedPusher,在缓存数据时不区分Partition。当数据总量超过阈值(即64M)时,根据PartitionId对当前数据进行排序,然后分批推送数据,解决内存消耗过大的问题。问题。Sort-BasedPusher会引入一种额外的排序,其性能比Hash-BasedPusher稍差。我们在ShuffleWriter初始化阶段根据Reducer的并发度自动选择合适的Pusher。3磁盘容错出于性能考虑,阿里云RSS推荐本地磁盘存储,所以处理坏盘/慢盘是保证服务可靠性的前提。Worker节点的DeviceMonitor线程定时检查磁盘,检查项包括IOHang、使用情况、读写异常等。此外,Worker会捕获并报告所有磁盘操作(创建文件、刷盘)的异常。IOHang和读/写异常被认为是CriticalErrors,磁盘将被隔离,磁盘上的存储服务将被终止。磁盘慢、使用率超过警戒线等异常仅隔离磁盘,不再接受新的Partition存储请求,现有Partition服务保持正常。磁盘隔离后,worker的容量和负载会发生变化,这些信息会通过heartbeat发送给master。4滚动升级RSS作为常驻服务,有永不停止的需求,而系统本身也一直在进化,所以滚动升级是必备功能。虽然可以绕过子集群部署方式,即部署多个子集群,对子集群进行灰度化,对灰度集群暂停服务,但这种方式依赖于调度系统对灰度集群的感知和动态修改作业配置。我们认为RSS应该关闭滚动升级循环,核心设计如下图所示。Client向Master节点的Leader角色发起滚动升级请求(Master实现HA,见上文),并上传更新包给Leader。Leader通过Raft协议将状态改为滚动升级,开始第一阶段升级:升级Master节点。Leader先升级所有Follower,然后替换本地包并重启。当Leader节点发生变化时,升级过程不会中断或异常。主节点升级完成后,进入第二阶段:工作节点升级。RSS使用滑动窗口进行升级。window中的worker尽可能优雅地下线,即拒绝新的Partition请求,等待本地shuffle结束。为了避免等待太久,设置了超时。另外,窗口中的Worker选择会尽量避免同时包含master和slave两个副本,以减少数据丢失的概率。5混沌测试框架对于服务,单纯依靠UT、集成测试、端到端测试等无法保证服务可靠性,因为这些测试无法覆盖线上复杂环境,如坏盘、CPU过载、网络过载、机器挂起等。当这些复杂情况发生时,RSS需要保持服务稳定。为了模拟线上环境,我们开发了模拟(混沌)测试框架,在测试环境中模拟线上可能出现的异常情况,同时保证满足RSS运行的最低运行环境。即至少有3个Master节点和2个Worker节点可用,每个Worker节点至少有一块磁盘。我们继续做这种RSS的压力测试。仿真测试框架结构如下图所示。首先定义测试计划,描述事件类型、事件触发的顺序和持续时间。事件类型包括节点异常、磁盘异常、IO异常、CPU过载等。客户端将Plan提交给Scheduler,Scheduler根据Plan的描述将具体的Operation发送给各个节点的Runner,Runner负责具体的执行,并报告当前节点的状态。在触发Operation之前,Scheduler会推断事件发生的后果,如果不满足RSS的最低运行环境,就会拒绝该事件。我们认为模拟测试框架的思路是一个通用的设计,可以扩展到更多的业务测试。6多引擎支持Shuffle是通用操作,不绑定引擎,所以我们尝试了多引擎支持。目前我们支持Hive+RSS,我们也在探索与流计算引擎(Flink)和MPP引擎(Presto)结合的可能性。虽然Hive和Spark都是批计算引擎,但是Shuffle的行为是不一致的。最大的区别是Hive在Mapper端排序,Reducer只做Merge,而Spark在Reducer端排序。由于RSS还不支持计算,需要修改Tez以支持Reducer排序。另外,Spark有干净的Shuffle插件接口,RSS只需要在外围做扩展,而Tez没有类似的抽象,在这方面也有些侵入性。目前大部分引擎都没有Shuffle插件抽象,需要对引擎进行一定程度的改造。另外,流计算和MPP都是上游即时推送到下游的模式,而RSS是上游推送下游拉取的模式。如何将两者结合起来也需要探索。7测试我们对比了阿里云RSS、Magent和开源系统X。由于我们的系统还在不断演进,测试结果仅代表当前。测试环境Header*1:ecs.g6e.4xlarge,16*2.5GHz/3.2GHz,64GiB,10GbpsWorker*3:ecs.g6e.8xlarge,32*2.5GHz/3.2GHz,128GiB,10GbpsAlibabaCloudRSSvs.Magnet5TTerasortMagent的性能测试如下图所示。如上所述,Magent的ShuffleWrite有额外的开销,比RSS和传统方法差。Magent的ShuffleRead有所改进,但不如RSS。在这个benchmark下,RSS明显优于其他两者,而Magent的e2e时间略优于传统的Shuffle。阿里云RSS与开源系统XRSS和开源系统X在TPCDS-3T中的性能对比如下,总时间RSS快20%。稳定性在稳定性方面,我们测试了Reducers大规模并发的场景。Magnet可以跑通,但是时间比RSS慢好几倍。SystemX在ShuffleWrite阶段报告错误。4.阿里云RSS在小米的实践1现状及痛点小米的离线集群主要基于Yarn+HDFS,NodeManager和DataNode混合部署。Spark是支撑核心计算任务的主要离线引擎。目前Spark作业最大的痛点集中在Shuffle导致的稳定性和性能差,以及存储计算分离架构的局限性。经过resourceguarantee和jobtuning后,job失败的主要原因是FetchFailure,如下图所示。由于大部分集群使用HDD,传统Shuffle的高随机读和高网络连接导致性能不佳,稳定性低导致的stage重计算将进一步加剧性能退化。此外,小米一直试图利用存储计算分离架构的计算灵活性来降低成本,但Shuffle对本地磁盘的依赖阻碍了它的发展。2RSS在小米的实现小米一直在关注Shuffle优化相关技术。1月21日,与阿里云EMR团队就RSS项目建立共创关系。3月,第一个生产集群上线并开始接入运营。100+节点规模的HA集群上线。首批300+节点将于9月上线。默认情况下,集群将启用RSS。后续计划将进一步扩大RSS的灰度。在落地的过程中,小米主导了磁盘容错的研发,大大提升了RSS的服务稳定性。技术细节如上所述。另外,在RSS前期还没有完全稳定的情况下,小米针对多个环节的RSS操作实现了容错。在调度端,如果启用RSS的Spark作业由于Shuffle报错,Yarn会回退到ESS进行下一次重试。在ShuffleWriter初始化阶段,小米主导了自适应回退机制,根据当前RSS集群的负载和作业特点(比如reducer并发量是否过大),自动选择RSS或ESS,从而提高稳定性。3效果接入RSS后,Spark作业的稳定性和性能都有明显提升。因FetchFailure而失败的作业几乎不再失败,性能平均提升了20%。下图是访问RSS前后的工作稳定性对比。ESS:RSS:下图是接入RSS前后的作业运行时间对比。ESS:RSS:在存储计算分离方面,小米海外集群接入RSS后成功启动了1600+Core的弹性集群,运行稳定。在阿里云EMR团队和小米Spark团队的共同努力下,RSS带来的稳定性和性能提升得到了充分验证。未来,小米将继续扩大RSS集群和运营规模,在资源弹性伸缩场景中发挥更大作用。5、开源重要的事情说三遍:“阿里云RSS开源!”X3git地址:https://github.com/alibaba/RemoteShuffleService开源代码包含核心功能和容错能力,满足生产需求。计划中的重要特性:AESpark多版本支持更好的流控更好的监控更好的HA多引擎支持欢迎各界开发者共建!6参考文献[1]MinShen,YeZhou,ChandniSingh.Magnet:用于大规模数据处理的基于推送的Shuffle服务。VLDB2020.[2]HaoyuZhang、BrianCho、ErginSeyfe、AveryChing、MichaelJ.Freedman。Riffle:针对大规模数据分析优化的Shuffle服务。EuroSys2018.[3]SriramRao、RaghuRamakrishnan、AdamSilberstein。Sailfish:大规模数据处理框架。SoCC2012.[4]KFS。http://code.google.com/p/kosmosfs/[5]谷歌数据流随机播放。https://cloud.google.com/blog/products/data-analytics/how-distributed-shuffle-improves-scalability-and-performance-cloud-dataflow-pipelines[6]Cosco:一种高效的Facebook规模洗牌服务。https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service[7]FlashforApacheSparkShufflewithCosco。https://databricks.com/session_na20/flash-for-apache-spark-shuffle-with-cosco[8]优步宙斯。https://databricks.com/session_na20/zeus-ubers-highly-scalable-and-distributed-shuffle-as-a-service[9]UberZeus。https://github.com/uber/RemoteShuffleService[10]英特尔RPMP。https://databricks.com/session_na20/accelerating-apache-spark-shuffle-for-data-analytics-on-the-cloud-with-remote-persistent-memory-pools[11]腾讯FireStorm。https://github.com/Tencent/Firestorm[12]阿里云RSS趣头条实践。https://developer.aliyun.com/article/779686[13]阿里云RSS架构。https://developer.aliyun.com/article/772329