当前位置: 首页 > 科技观察

Kafka各种跨IDC容灾方案的研究与比较

时间:2023-03-15 14:02:08 科技观察

1.前言为了尽量减少自然灾害和人为灾害(如停电、灾难性软件故障和网络中断)对业务的影响,以及随着我行基于Kafka的实时业务的不断增长,Kafka的重要性越来越高日复一日,逐步优化我行跨IDC的Kafka持续建设成为急需解决的问题。本文从元数据同步、数据复制、消费位移同步、容灾模式等方面对现有容灾方案进行调研和比较。2、现有容灾方案说明。用户MirrorMaker1(简称MM1)的原理是启动消费者从源集群消费,然后发送到目标集群。功能比较简单。MirrorMaker2(简称MM2)或基于MM2的改进,是基于KafkaConnect框架实现,LinkedIn工程师贡献,修复了MM1的局限性,主题和分区可以自动感知,acl和配置可以自动同步,支持active-active,提供偏移量转换功能360ConfluentReplicatorConfluent付费版,相比MM2,active-active模式更加优雅,可以支持对单条消息的修改。Confluent的follower-based同步机制是利用Kafka的复制同步机制创建一个Fetcher线程来同步数据。需要对原始Kafka进行二次开发。字节跳动,滴滴uReplicator改进MM1,采用分布式任务管理框架ApacheHelix控制Partitions的分配,不需要fullrebalance。Uberbrooklin改进了MM1。实现思路和MM2类似。和uReplicator一样,为了减少rebalance,使用StickyAssignment来控制Partition的分配。除了支持Kafka集群之间的复制,还可以作为AzureEventHubs,AWSKinesisstreaming服务之间的通道,也可以作为CDCconnectorLinkedIn3.各方案主要设计要点对比分析3.1元数据同步元数据同步主要是指Topic、Partition、Configuration、ACL的同步,我们需要评估在添加主题、分区扩展、修改Configuration和ACL后是否可以自动感知到每个方案,以及是否选择了复制的主题评估方案是否灵活(比如是否支持白名单和黑名单机制,是否支持正则化),目标集群中的Topic名称是否变化(决定是否支持双向复制,是否发生循环复制)。在MM1方案中,选择复制的topic只支持白名单机制(通过--whitelist或--include参数指定),白名单支持正则写入。但是,当一个新主题被添加到源集群时,目标集群的auto.create.topics。当enable设置为true时,可以在目标集群中自动创建一个同名的topic(可以扩展messagehandler并重命名),否则必须重启MirrorMaker才能发现新添加的topic。关于目标集群上的topic分区数,MM1是基于默认值num.partitions配置的(其他方案没有这个问题),无法与源集群保持一致,无法同步ACL。与MM1相比,MM2弥补了上述不足。它主要依赖MirrorSourceConnector中的多个定时任务来实现这个功能。Topic/Partition、Configuration、ACL更新的时间间隔由三个参数指定,非常灵活。在MM2中,从版本3.0.0开始,支持两种复制策略。在默认的DefaultReplicationPolicy中,topic名称在目标集群复制后会发生变化,并且会在其前面加上源集群的前缀。为了兼容MM1,在3.0.0新增了IdentityReplicationPolicy,在目标集群复制后topic名称不会改变。ConfluentReplicator,根据官网描述,也有以上功能。原理和MM2类似,只是检测更新只由一个参数决定。Replicator可以定义复制后的topic名称,由参数topic.rename.format指定。默认值是保持主题名称不变。基于follower的同步机制的实现方式,网上资料不多,不详,但原理估计和MM2类似,复制后目标集群中topic名称不变。uReplicator的实现略有不同。复制哪些主题由参数enableAutoWhitelist和patternToExcludeTopics决定。当enableAutoWhitelist设置为true时,如果源集群和目标集群存在相同的topic,不需要其他设置就可以实现数据复制。如果设置为false,则需要将复制的topic名称等信息提交给控制分区分配的uReplicatorController。另外,黑名单参数patternToExcludeTopics控制哪些topic不需要复制;是否自动感知分区扩展由参数enableAutoTopicExpansion控制;关于Configuration和ACL无法同步。brooklin选择复制的topic只支持白名单机制,可以支持regularization。添加新主题和分区扩展后,可以自动感知。检测更新由参数partitionFetchIntervalMs决定。复制后,主题名可以加前缀,由参数DESTINATION_TOPIC_PFEFIX决定。总结如下:方案MM1MM2ConfluentReplicator是基于Follower的同步机制uReplicatorbrooklin。复制后,Topic名称将保持不变。它也可以定制并保持不变。也可以添加固定前缀保持不变。它也可以定制。变化,也可以定义一个前缀自动检测和复制新的topic部分支持(取决于目标集群是否开启自动创建topic)是否支持取决于二次开发功能不支持支持自动检测和复制新分区不支持supported支持依赖二次开发功能支持源集群和目标集群主题配置一致。不支持支持。支持支持。支持配置和ACL更新是否同步。不支持。:主题是否有白名单、黑名单和正则表达式部分支持支持的支持取决于二次开发的功能复制后消息偏移量是否对齐,是否能保证复制时的数据一致性,即是否会丢失数据或出现重复数据。首先说明一下,由于复制的延迟,所有这些容灾方案中的RPO都不等于0。基于follower的同步机制方案可以保持offset对齐。由于replica同步的延迟,当masterroom出现异常时,backuproom仍然可能会丢失一些数据,可以保持offset一致,不存在重复数据的可能。其他方案都不能保证偏移量对齐(除非复制时源主题的偏移量从0开始)。关于各个方案中消费者从源集群消费,再写入目标集群的逻辑,我们一一详细解释:首先从MM1开始,这是他的设计架构:在KIP-3MirrorMakerEnhancement中,上面-上面提到的架构设计,保证不丢数有以下几点:1.关闭消费者自动提交位移,在提交位移前调用生产者.flush()将缓存中的数据刷出2.在生产者端,通过设置这些参数max.in.flight.requests.per.connection=1(多个消费者共享一个生产者,这个生产者每次只向broker发送一个请求),retries=Int.MaxValue(return是可重试异常,无限次重试,直到buffer满),ack=-1(发送给所有副本)3.设置abortOnSendFail,当生产者收到不可重试异常后(比如消息等异常那太大了),停止MirrorMaker进程,否则会丢失一些发送失败的数据。另外,为了避免consumerrebalance时出现重复数据(rebalance时有些数据位移没有提交),定义一个新的consumerRebalancelistener,当partitionRevoke发生时,先刷新producer缓存中的数据,然后再提交位移。从上面的设计来看,MM1并没有丢失数据,但是仍然存在数据重复的可能,这是Kafka的非幂等Producer决定的。另外,MM1的设计存在很多缺陷,比如只有一个Producer,发送效率低。另外,这个Producer是轮询发送的,消息发送到的目标Topic上的分区不一定和源Topic上的分区相同。因为轮询,这个Producer会和集群中的每个broker建立连接。与uReplicator相比,位移也是在flush之后提交,避免数据丢失。改进了MM1的缺陷。每个WorkerInstance都有多个Fe??tcherThreads和多个ProducerThreads。从源集群中获取数据后,会将其放入队列中,ProducerThread从队列中取出数据发送到目标集群的topic,发送到目的topic的每条消息分区与源分区一致,可以保持语义一致性。在Brooklin中,可以在每个BrooklinInstance中设置多个Consumer和Producer,同时也可以保持语义的一致性。相对于uReplicator的一个优势是它提供了flushlessproducers(也可以提供flushProducers),这些消息只有发送成功后才会提交这些偏移量,因为调用Producer.flush()可以强制发送buffer中的数据,但代价高,发送线程会在缓冲区清空前被阻塞。consumer.poll()->producer.send(records)->producer.flush()->consumer.commit()优化为:consumer.poll()->producer.send(records)->consumer.commit(offsets)在MirrorMaker2中,使用KafkaConnect框架来拷贝数据。从源头消费数据后,存储在IdentityHashMap类型的内存结构outstandingMessages中。Producer发送到目的地成功后,会将消息从内存结构中删除。另外,从源端消费的进度会定时保存到KafkaTopic中。这种实现机制不会丢失数据,但是Producer发送成功后,进程在持久化之前进程异常挂掉,会产生重复消息。目前在KIP-656:MirrorMaker2Exactly-onceSemantics中提出了一种可以实现ExactlyOnlyOnce的方案。思路是在一次事务中提交消费位移和发送消息,但是相关的PatchKAFKA-10339还没有合并到主Branch中,最后一次更新停留在8月20日。根据ConfluentReplicator官网描述,复制不会丢失数据,但可能会重复。因此,和上面提到的MM2、uReplicator、brooklin一样,它提供了AtleastOnceDelivery消息传递语义。解决方案MM1MM2ConfluentReplicatorFollower-based同步机制uReplicatorbrooklin复制前后分区语义一致不支持支持支持支持偏移对齐不支持不支持不支持不支持不支持消息传递语义无数据丢失,可能重复AtleastOnce无数据丢失,可能至少重复一次将在未来无损地提供EOS语义。根据二次开发功能的不同,可能会重复AtleastOnce。从Kafka副本同步的原理来看,当参数设置合理时,副本间同步过程中数据可以保持一致而不丢失数据,可以重复至少一次而不丢失数据,可以重复至少一次3.3消费位移同步在容灾方案中,除了数据复制,消费位移的同步也很重要。容灾切换后,消费者能否在新集群恢复消费,取决于能否同步消费者offset。在MM1的设计中,如果要同步消费offset,只能将__consumer_offsets作为一个普通topic来同步。但是无法进行偏移量转换,因为源簇和目标簇的偏移量可能不对齐。在MM2设计中,解决了上述MM1问题。设计思路是在目标集群的checkpointtopic中定时记录消费位移,包括源端和目标端的提交位移。消息包括以下字段:消费者组id(String)消费者组主题(String)–包括源集群前缀主题名称分区(int)分区名称上游偏移量(int):源集群中最新提交的偏移量消费偏移量下游偏移量(int)):latestcommittedoffsettranslatedtotargetclustertargetclusterConsumptiondisplacementmetadata(String)partitionmetadatatimestamp此外,还设计了一个offsetsyncTopic来记录source和destinationoffset之间的映射。同时,MM2还提供了位移转换的MirrorClient接口://从特定的上游消费者组中查找最新的checkpoint对应的本地offsets.MapremoteConsumerOffsets(StringconsumerGroupId,``StringremoteClusterAlias,Duration暂停)。..在uReplicator中,设计了另外一个offsetSyncservice,和MM2类似(可能MM2参考了uReplicator的设计)。该服务可以实时收集不同集群偏移量的映射关系,并计算从一个DC切换到另一个DC后从哪个偏移量读取。在brooklin中,没有类似uReplicator的offsetSync服务,需要自己实现。在ConfluentReplicator中,用另一种思路来解决这个问题。不同DC的时间是一致的,Kafka消息中包含时间戳。5.0版本引入了一个新功能,使用时间戳自动转换偏移量,这样消费者就可以故障转移到不同的数据中心,并开始在源集群中被中断的目标集群中消费数据。使用该功能需要在Consumer中设置ConsumerTimestampsInterceptor拦截器,它保留了消费消息的元数据,包括:?ConsumergroupID?Topicname?Partition?Committedoffset?Timestamp这个consumer时间戳信息保存在位于源集群中的名为__consumer_timestamps的Kafka主题中。然后Replicator通过以下步骤进行offset转换:从源集群的consumer_timestamps主题中读取consumeroffset和timestamp信息,获取consumergroup的进度将源数据中心committedoffset转换为目标数据中对应的offsetcenter将转换后的offset写入目标集群中的__consumer_offsetstopic中,consumer切换到目标center的集群后可以继续消费。基于Follower的同步机制方案,Topic是完全一致的。只要__consumer_offsets也同步了,consumer在故障转移后就可以继续消费。消费位移同步方面,各方案总结如下:方案MM1MM2ConfluentReplicator基于Follower的同步机制uReplicatorbrooklin复制消费位移部分支持支持支持部分支持部分支持Offset转换不支持支持支持不支持不支持客户端切换客户端定制寻求offset通过接口获取目标簇的偏移量,然后seek不需要做额外的转换。启动后,无需额外转换。启动后可以通过synctopicservice查看目标集群的offset,然后seekclient自定义seekoffset3.4是否支持双活,为了提高资源利用率,容灾模式的选择也是一个重要的考虑因素。MM1不支持双活模式,无法配置两个集群互相复制(“Active/Active”),主要是如果两个集群中存在同名主题,主题循环复制问题无法解决得到解决。MM1中可能存在循环复制的问题在MM2中得到了解决。解决办法是复制的topic和原来的topic名称不一致,会加上源集群的名称作为前缀。例如,在下面的例子中,集群A中的topic1正在被复制到集群B之后,名称被更改为A.topic1。但是MM2的DefaultReplicationPolicy是在复制后更改主题名称,这会增加客户端的切换成本。可以考虑改成IdentityReplicationPolicy。这种复制策略只能支持单向复制。主集群提供业务服务,即Active/Standy模式。在ConfluentReplicator5.0.1中,为了避免循环复制,使用了KIP-82AddRecordHeaders的特性,在消息的头部添加消息的来源。如果目标集群的集群ID与标头中的源集群ID匹配,并且如果目标Topic名称与标头的Topic名称匹配,则Replicator不会将消息复制到目标集群。如下图所示:DC-1的m1被复制后,DC-2在报文的头部添加了一个标记。这条消息是从DC-1复制过来的,所以Replicator不会把DC-2的m1复制到DC-1,同理,DC-1的m2也不会复制到DC-2。因此,ConfluentReplicator可以支持Active/Active模式。在uReplicator中,区域级别的故障转移是通过数据冗余来提供的。在本次设计中,除了在每个区域部署一个本地Kafka集群外,还会部署一组聚合集群。这个聚合集群存储了所有region的数据。当区域集群A和B存在相同主题时,聚合后区域A和B的消息偏移量可能不一致。uReplicator设计了一个偏移量管理服务来记录这个对应关系。示例如下:在此设计中,您可以支持消费者的Active/Active和Active/Standy模式。前者是每个地区的消费者消费聚合集群数据。只有一个区域是主区域,只有主区域的数据才能更新数据到后端数据库。当主区失效时,指定一个新的主区,新的主区继续消耗和计算。在Active/Standy模式下,所有区域只有一个消费者。区域失效后,在其他区域启动一个消费者,根据偏移量管理服务中记录的偏移量对应关系,从每个区域的区域集群中找到所有最新的检查点,然后根据检查点中的检查点找到最小的偏移量Standyregion中的聚合集群,从Standyregion中的offset开始消费。在Brooklin,也可以通过类似uReplicator的设计,利用数据冗余来实现Active/Active容灾模式。在字节推出的容灾方案中,Producer只能写入主集群(主备集群中的信息都保存在配置中心,客户端需要先到配置中心查询),Producer可以部署在双中心,但是通过配置中心路由到主集群,消费者也可以部署在双中心。如果采用Active/Standy模式,各自消费本地机房的数据,但只有主集群consumer的消费位移才能生效。在Active/Active模式下,消费者只能从主集群消费。在这两种模式下,双中心所有消费者的消费位移都存储在一个存储中。容灾模式方面,各方案总结如下:方案MM1MM2ConfluentReplicatorFollower-based同步机制uReplicatorbrooklin双集群是否可以相互复制不支持支持是不支持支持,依赖聚合集群支持,依赖聚合集群ProducerActive/Active不支持SupportSupport支持,但实际上只写入主集群。支持的。不支持ConsumerActive/Standy。支持的。支持的。不支持消费者活动/活动。支持的。支持的。分为三种:1、Kafka社区的设计路由方案,从源集群消费,然后写入目标集群,包括MM1、MM2、uReplicator、brooklin。MM2指的是uReplicator的设计、实现计划和brooklin类似,在这四种方案中,MM2可以认为是优先级方案。2、ConfluentReplicator的商用收费方案也是使用KafkaConnect框架进行消费和写入。很好的避免了主题循环复制和消费位移转换,客户端切换成本很低。3、以字节、滴滴为代表的基于follower同步机制的解决方案。在此解决方案中,复制的主题是源主题的镜像。客户端不需要做偏移量转换,但是需要修改Kafka代码,考虑到后续和原版Kafka代码的融合,对技术要求很高。目前还没有完美的解决方案,各公司可以根据自己的实际需要制定。