作者:海源、世路、肖恩等Kafka承担了美团数据平台上数据统一缓存和分发的角色。随着数据量的增长和集群规模的扩大,Kafka面临的挑战也越来越严峻。一、现状与挑战1.1现状Kafka是一个开源的流处理平台。我们先了解一下Kafka在美团数据平台上的现状。图1-1美团数据平台Kafka现状如图1-1所示。蓝色部分描述了Kafka在数据平台上作为流式存储层的定位。主要职责是缓存和分发数据。它将收集的日志分发到不同的数据系统。这些日志来自系统日志、客户端日志和业务数据库。下游数据消费系统包括通过ODS入库进行离线计算、直接实时计算、通过DataLink同步到日志中心、OLAP分析等。美团Kafka集群总数已超过15000+台机器,单集群最大机器数已达到2000+台机器。在数据规模上,天空级消息量已经超过30+P,天空级消息峰值达到每秒4+亿条。但是,随着集群规模的增大和数据量的增加,Kafka面临的挑战也越来越严峻。我来说说具体的挑战。1.2挑战图1-2Kafka在美团数据平台上面临的挑战如图1-2所示。具体挑战可以概括为两部分:第一部分是慢节点影响读写。这里的慢节点是指HDFS的一个概念,具体定义是指读写延时TP99大于300ms的Broker。节点慢的原因有以下三种:集群负载不均衡会导致局部热点,即整个集群的磁盘空间充裕或者ioutil低,但是部分磁盘快满了或者ioutil满了。PageCache容量,例如80GB的PageCache在170MB/s的写入速率下只能缓存8分钟的数据。那么如果消费的是8分钟前的数据,可能会触发磁盘访问慢。消费者客户端线程模型的缺陷会导致端到端延迟指标失真。比如Consumer消费的多个partition在同一个Broker时,TP90可能小于100ms,但多个partition在不同Broker时,TP90可能大于1000ms。第二部分是大规模集群管理的复杂性。问题有四类:不同topic会相互影响,个别topic流量突然增加,或者个别消费者回溯影响集群整体稳定性。Kafka原生的Broker粒度指标不够完善,难以定位问题和分析根源。故障检测不及时,处理成本高。Rack层面的故障会导致部分分区不可用。2.读写延迟优化接下来介绍一下美团数据平台在解决读写延迟问题上所做的优化。首先从宏观层面,我们将影响因素分为应用层和系统层,然后详细介绍应用层和系统层存在的问题,并给出相应的解决方案,包括pipeline加速,Fetcher隔离,迁移取消和Cgroup资源隔离等,下面会详细介绍各种优化方案的实现。2.1概览图2-1Kafka读写延迟优化概览图2-1是对读写延迟遇到的问题及相应优化方案的概览。我们将影响因素分为应用层和系统层。应用层主要包括三类问题:1)Broker端负载不均衡,如diskusage不均衡,ioutil不均衡。单个磁盘负载的增加会影响整个Broker的请求。2)Broker的数据迁移存在效率问题和资源竞争问题。具体包括以下三个层次:迁移只能串行批量提交。每批次中,可能会有少量分区迁移缓慢,无法提交下一批次,影响迁移效率。迁移一般在夜间进行。如果迁移到下午高峰才完成,读写请求可能会受到很大影响。迁移请求和实时抓取存在共享Fetcher线程的问题。因此,分区迁移请求可能会影响实时消费请求。3)消费端单线程模型缺陷导致运维指标失真,单个消费者消费分区数不限。如果消费能力不足,将无法跟上最新的实时数据。当消耗的分区数量增加时,可能会导致回溯。.系统层也主要包括三类问题:1)PageCache污染。Kafka使用内核层提供的ZeroCopy技术来提升性能,但是内核层无法区分实时读写请求和回溯读请求,导致磁盘读取可能会污染PageCache,影响实时读写。2)HDD在随机读写负载下表现不佳。HDD对顺序读写比较友好,但是面对混合负载场景下的随机读写,性能下降明显。3)混合部署场景下对CPU、内存等系统资源的资源竞争。在美团大数据平台上,为了提高资源利用率,IO密集型服务(如Kafka)会与CPU密集型服务(如实时计算作业)混合使用。混合分布会造成资源竞争,影响读写延迟。针对上述问题,我们采取了有针对性的策略。比如应用层的磁盘均衡、迁移流水线加速、支持迁移取消、消费者异步等。系统层Raid卡加速、Cgroup隔离优化等。此外,我们还设计并实现了基于SSD的缓存架构,解决HDD随机读写性能不足的问题。2.2应用层①磁盘均衡图2-2Kafka应用层磁盘均衡磁盘热点导致两个问题:实时读写延迟变高,例如TP99请求处理时间超过300ms可能导致实时作业消费延迟,以及数据采集拥塞问题等。集群整体利用率不足。虽然集群容量很充足,但有些磁盘已经满了。这个时候,有些分区甚至会停止服务。针对这两个问题,我们采用了基于空闲磁盘优先级的分区迁移方案。整个计划分为三个步骤,由Rebalancer组件管理:生成迁移计划。Rebalancer根据目标磁盘使用情况和当前磁盘使用情况(通过KafkaMonitor报告)不断生成特定的分区迁移计划。提交迁移计划。Rebalancer将刚刚生成的迁移计划提交给Zookeeper的Reassign节点,Kafka的Controller收到Reassign事件后,将Reassign事件提交给整个KafkaBroker集群。检查迁移计划。KafkaBroker负责执行数据迁移任务,Rebalancer负责检查任务进度。如图2-2所示,每个Disk持有3个分区,是一个比较均衡的状态。如果有的Disk有4个分区,比如Broker1-Disk1和Broker4-Disk4;有些Disks持有2个分区,比如Broker2-Disk2、Broker3-Disk3,Reblanacer会将Broker1-Disk1和Broker4-Disk4上的冗余分区分别迁移到Broker2-Disk2和Broker3-Disk3,最终保证整体磁盘利用率为尽可能平衡。②迁移优化基于空闲磁盘优先级的分区迁移虽然实现了磁盘平衡,但是迁移本身仍然存在效率问题和资源竞争问题。接下来,我们将详细描述我们采用的目标定位策略。采用流水线加速策略,优化迁移慢带来的迁移效率问题。支持取消迁移,解决长尾分区迁移慢导致读写请求受影响的问题。Fetcher隔离用于缓解数据迁移请求和实时读写请求共享Fetcher线程的问题。优化1.流水线加速图2-3流水线加速如图2-3所示。箭头上方的原生Kafka版本只支持批量提交。例如,一次提交四个分区。当TP4分区卡住,无法完成时,后面的所有分区都无法继续。采用流水线加速后,即使TP4分区还没有完成,也可以继续提交新的分区。同时,原计划因TP4未完成而受阻,后续所有分区均无法完成。在新计划中,TP4分区已迁移到TP11分区。图中的虚线表示一个无序的时间窗口,主要用来控制并发。目的是保持与原来分组提交数的一致性,避免迁移过多影响读写请求服务。优化2、迁移取消图2-4-1迁移问题如图2-4-1所示。箭头左侧描述了受迁移影响的三种在线类型。一是因为迁移会触发最老读,同步大量数据。在这个过程中,数据会先被flush回PageCache,造成PageCache污染,导致实时读分区出现cachemiss,触发磁盘速度,影响读写请求;第二种是当一些异常的节点导致迁移挂掉,导致一些运维操作无法进行,比如流量增加触发的主题自动扩容。因为这样的运维操作在Kafka迁移过程中是被禁止的。第三种类似于第二种。它的主要问题是当目标节点崩溃时,主题分区的扩容无法完成,用户可能一直遭受读写请求的冲击。图2-4-2迁移取消针对以上三个问题,我们支持迁移取消功能。管理员可以调用迁移取消命令中断正在迁移的分区。对于第一种场景,PageCache不会被污染,可以保证实时读取;在第二种和第三种情况下,由于迁移取消,可以完成分区的扩展。迁移取消将删除尚未迁移的分区。删除可能会造成磁盘IO瓶颈,影响读写。因此,我们支持平滑删除,避免大量删除带来的性能问题。优化3、Fetcher隔离图2-5Fetcher隔离如图2-5所示,绿色代表实时读,红色代表延时读。当一个follower的实时读和延迟读共享同一个fetcher时,延迟读会影响实时读。因为每次延迟读取的数据量明显大于实时读取,延迟读取很可能会触发磁盘读取,数据可能不再在PageCache中,从而显着降低Fetcher的拉取效率。为了解决这个问题,我们实施了一种称为Fetcher隔离的策略。也就是说所有ISRFollower共享Fetcher,所有非ISRFollower共享Fetcher,这样所有ISR中的实时读取就不会受到非ISR回溯读取的影响。③Consumer异步图2-6Kafka-Broker阶段性延迟统计模型在描述Consumer异步之前,有必要先解释下图2-6所示的Kafka-Broker阶段性延迟统计模型。Kafka-Broker端是典型的事件驱动架构,各个组件之间通过队列进行通信。当请求流经不同的组件时,依次记录时间戳,最终可以统计出请求在不同阶段的执行时间。具体来说,当KafkaProducer或Consumer请求进入Kafka-Broker时,Processor组件将请求写入RequestQueue,RequestHandler从RequestQueue中拉取请求进行处理。RequestQueue中的等待时间为RequestQueueTime,RequestHandler具体执行时间为LocalTime。当RequestHandler执行完毕后,会将请求传递给DelayedPurgatory组件,这是一个延迟队列。当触发某个延迟条件时,请求将被写入ResponseQueue。DelayedPurgatory队列的持续时间是RemoteTime。Processor会不断的从ResponseQueue中拉取数据发送给客户端。红色的ResponseTime是可能的。会受到client的影响,因为如果client的接收能力不足,ResponseTime会不断增加。从Kafka-Broker的角度来看,每个请求的总耗时RequestTotalTime包含了刚才所有进程的阶段性计时之和。图2-7Consumer异步ResponseTime不断增加的主要问题是Kafka原生的Consumer基于NIO的单线程模型存在缺陷。如图2-7所示,在Phase1中,User首先发起Poll请求,Kafka-Client同时向Broker1、Broker2、Broker3发送请求。当Broker1的数据先准备好后,KafkaClient将数据写入CompleteQueue并立即返回。而不是继续拉取Broker2和Broker3的数据。后续的Poll请求会直接从CompleteQueue中读取数据,然后直接返回,直到CompleteQueue清空。在CompleteQueue清零之前,即使Broker2和Broker3末端的数据准备好了,也不会及时拉取。如图中Phase2所示,由于单线程模型的缺陷,导致WaitFetch持续时间变长,导致Kafka-Broker的RespnseTime延迟指标不断增加。问题是无法准确监控和细分服务器的处理瓶颈。.图2-8引入异步拉取线程为了解决这个问题,我们的改进是引入异步拉取线程。异步拉取线程会及时拉取就绪数据,避免服务器端延迟指标的影响,同时Kafka原生不限制同时拉取的分区数。我们这里限制速度,避免GC和OOM的发生。异步线程不断在后台拉取数据,放入CompleteQueue中。2.3系统层①Raid卡加速图2-9Raid卡加速硬盘存在随机写性能不足的问题,表现为延迟增加,吞吐量下降。为了解决这个问题,我们引入了Raid卡加速。Raid卡有自己的缓存,类似于PageCache。在Raid层,将数据合并成更大的块写入Disk,充分利用顺序写入硬盘的带宽。借助Raid卡保证随机写入性能。②Cgroup隔离优化图2-10Cgroup隔离为了提高资源利用率,美团数据平台将IO密集型应用和CPU密集型应用混合部署。这里的IO密集型应用指的是Kafka,这里的CPU密集型应用指的是Flink和Storm。但是,原有的隔离策略存在两个问题:第一,物理核心本身存在资源竞争。同一个物理核下,共享的L1Cache和L2Cache存在竞争。当实时平台的CPU飙升时,会导致Kafka读写延迟。做作的;其次,Kafka的HT跨越了NUMA,增加了内存访问时间。如图2-10所示,跨NUMA节点使用QPI进行远程访问,远程访问耗时40ns。针对这两个问题,我们改进了隔离策略。对于物理核的资源竞争,我们新的混合分布策略保证了Kafka独占物理核。也就是说,在新的隔离策略中,不存在同一个物理核心。Kafka和Flink同时使用;然后保证Kafka的所有超线程都在NUMA的同一侧,避免Kafka跨NUMA造成的访问延迟。通过新的隔离策略,Kafka的读写延迟不再受FlinkCPU暴涨的影响。2.4Hybridlayer-SSD新缓存架构图2-11Pagepollution导致性能问题的背景和挑战Kafka使用操作系统提供的ZeroCopy技术处理数据读请求。读取延迟。但实际上PageCache的容量往往是不够的,因为它不会超过一台机器的内存。当容量不足时,ZeroCopy会触发一次磁盘读取,不仅速度明显变慢,还会污染PageCache,影响其他读写。如图2-11左半部分所示,当一个延迟消费者去取数据,发现PageCache中没有自己想要的数据时,此时会触发一次磁盘读取。磁盘读完后,数据会回写到PageCache中,造成PageCache污染,延迟消费者消费,减慢消费延迟,也会影响另一个实时消费。因为对于实时消费,它总是读取最新的数据,而最新的数据应该不会正常触发磁盘读取。选择与决策针对这个问题,我们在做方案选择的时候给出了两种解决方案:方案一,读盘时不回写PageCache,比如使用DirectIO,但是Java不支持;方案2,在内存和HDD之间引入一个中间层,比如SSD。众所周知,SSD比HDD具有更好的随机读写能力,非常适合我们的使用场景。对于SSD方案我们也有两种选择:方案一,可以基于操作系统的内核来实现。该方案将SSD和HDD的存储空间按照固定大小划分为块,建立SSD和HDD的映射关系,同时会以数据局部性为原则,经过一个CacheMiss,数据会根据LRU和LFU替换SSD中的部分数据。业界典型的解决方案包括OpenCAS和FlashCache。其优点是数据路由对应用层透明,应用代码改动量小,社区活跃可用;但问题是局部性原则不满足Kafka的读写特性,缓存空间污染的问题也没有得到根本解决,因为它会按照LRU和LFU替换SSD中的部分数据。方案二基于Kafka应用层实现。具体来说,Kafka数据按照时间维度存储在不同的设备上。对于近实时数据,直接放在SSD上,对于较旧的数据,直接放在HDD上。Offset从对应的设备中读取数据。该方案的优势在于其缓存策略充分考虑了Kafka的读写特性,保证所有近实时的数据消费请求都落在SSD上,保证了这些请求在处理过程中的低延迟,同时,从HDD读取的数据不会返回Flashing到SSD可防止缓存污染。同时,由于每个日志段都有唯一清晰的状态,每个请求都有明确的目的,不存在CacheMiss带来的额外性能开销,同时缺点也很明显。需要在服务端代码上进行改进,涉及的开发和测试工作量比较大。图2-13KafkaSSD新缓存架构具体实现下面介绍新SSD缓存架构的具体实现。首先,新的缓存架构会将Log中的多个Segment按照时间维度存储在不同的存储设备上,如图2-14中红圈1所示。新的缓存架构数据会有三种典型状态,一种叫做OnlyCache,表示数据刚刚写入SSD,还没有同步到HDD;二是Cached,即数据已经同步到HDD,部分缓存到SSD;第三种叫WithoutCache,意思是已经同步到HDD上了,SSD里面已经没有cache了。然后后台异步线程不断将SSD数据同步到HDD。随着SSD的不断写入,当存储空间达到阈值时,会按照时间顺序删除最早的数据,因为SSD的数据空间是有限的。副本可以根据可用性要求灵活启用或不写入SSD。从HDD读取的数据不会闪回SSD以防止缓存污染。图2-14新SSD缓存架构细节优化细节优化介绍完具体实现,我们再来看细节优化。首先是关于logsegment同步,也就是刚才说的Segment,它只同步inactivelogsegment。Inactive是指当前没有被写入的日志段,以低成本解决数据一致性问题。二是优化同步限速。SSD同步到HDD时,需要限速,同时保护两个设备,不影响其他IO请求的处理。3.大规模集群管理优化3.1隔离策略美团大数据平台的Kafka服务于多个业务。如果这些业务的主题混在一起,很可能不同业务的不同主题会相互影响。另外,如果Controller节点同时承担数据读写请求,当负载明显变高时,Controller可能无法及时控制此类请求,比如元数据变更请求,最终可能导致整个集群失败。针对这些相互关联的问题,我们从业务、角色、优先级三个维度进行隔离和优化。图3-1隔离优化第一点是业务隔离,如图3-1所示,每个大的业务都会有一个独立的Kafka集群,比如外卖、门店配送、优化等。第二点是角色隔离。这里Kafka的Broker和Controller以及它们的依赖组件Zookeeper部署在不同的机器上,避免相互影响。第三点是优先级。有些业务topic的可用性级别特别高,我们可以把它们划分成VIP集群,给它们更多的资源冗余来保证它们的可用性。3.2全链路监控随着集群规模的增长,集群管理遇到了一系列问题,主要包括两个方面:Broker端延迟指标不能及时响应用户问题。随着请求数量的增加,目前Kafka在Broker端提供的TP99甚至TP999延迟指标可能无法反映长尾延迟。Broker端的延迟指标不是端到端的指标,可能不能反映用户真正的问题。故障感知和处理不及时。图3-2全链路监控针对这两个问题,我们采用的策略是全链路监控。全链路监控收集和监控Kafka核心组件的指标和日志。全链路监控架构如图3-2所示。当某个客户端的读写请求变慢时,我们可以通过全链路监控快速定位到具体的链路。全链路指标监控如图3-3所示。图3-3全链路指标监控图3-4是基于全链路指标定位请求瓶颈的示例。可以看出服务器上的远程时间占比最高,说明时间主要花在了数据复制上。日志和指标分析服务,实时自动检测故障和慢速节点。大多数故障(内存、磁盘、Raid卡、网卡等)和慢节点已经支持自动处理。另一类故障是计划外故障,如分区多副本挂起导致的不可用、迁移挂起和意外错误日志等,需要人工干预。图3-4全链路监控指标示例3.3服务生命周期管理图3-5服务生命周期管理美团在线Kafka服务器规模以万计。随着服务规模的增长,我们对服务和机器本身的管理,也在不断迭代。我们的自动化运维系统可以处理大部分机器故障和慢速服务节点,但是机器和服务的管理是分离的,导致两类问题:状态语义不明确,不能真实反映系统状态,往往需要依赖通过日志和指标了解真实系统是否健康或异常。状态不全面,异常情况需要人工干预,误操作风险极高。为了解决这两类问题,我们引入了生命周期管理机制,确保系统状态能够得到真实反映。生命周期管理是指机器报废时从服务开始到服务结束的全过程管理,实现服务状态与机器状态的联动,无需人工同步和更改。而且,新的生命周期管理机制的状态变更由特定的自动化运维触发,禁止人工变更。3.4TOR灾备图3-6TOR灾备挑战从工程实现的角度,我们总结了当前主流图神经网络模型的基本范式,实现了一套通用的框架来覆盖多种GNN模型。下面根据图的类型(同构图、异构图、动态图)分别进行讨论。图3-7TOR容灾TOR容灾保证同一分区的不同副本不在同一个Rack中,如图3-7所示,即使整个Rack1出现故障,也能保证所有分区可用。4未来展望在过去的一段时间里,我们围绕降低服务器的读写延迟做了很多优化,但是在服务的高可用方面还有一些工作要做。未来一段时间,我们将重点通过各种粒度的隔离机制来提高健壮性和减少故障域。比如让客户端主动避开一些故障节点,通过服务端多队列隔离异常请求,支持服务端热卸载,网络层主动反压限流等。另外,随着美团实战的全面发展,时间计算业务,实时计算引擎(典型的是Flink)和流存储引擎(典型的是Kafka)的混合部署模式越来越难以满足业务需求。因此,我们需要在保持当前成本不变的情况下独立部署Kafka。这意味着需要使用更少的机器(在我们的业务模型中,是原来机器的1/4)来承载持续的业务流量。如何用更少的机器来承载业务请求,同时保证服务的稳定性,也是我们面临的挑战之一。最后,随着云原生趋势的到来,我们也在探索流式存储服务上云的方式。5作者简介海源、石璐、肖恩、罗红、奇帆、胡蓉、李杰等均来自美团数据科学与平台部。
