1.前言为了尽量减少自然灾害和人为灾害(如停电、灾难性软件故障和网络中断)对业务的影响,以及随着我行基于Kafka的实时业务的不断增长,Kafka的重要性越来越高日复一日,逐步优化我行跨IDC的Kafka持续建设成为急需解决的问题。本文从元数据同步、数据复制、消费位移同步、容灾模式等方面对现有容灾方案进行调研和比较。2、现有容灾方案说明。用户MirrorMaker1(简称MM1)的原理是启动消费者从源集群消费,然后发送到目标集群。功能比较简单。MirrorMaker2(简称MM2)或基于MM2的改进,是基于KafkaConnect框架实现,LinkedIn工程师贡献,修复了MM1的局限性,主题和分区可以自动感知,acl和配置可以自动同步,支持active-active,提供偏移量转换功能360ConfluentReplicatorConfluent付费版,相比MM2,active-active模式更加优雅,可以支持对单条消息的修改。Confluent的follower-based同步机制是利用Kafka的复制同步机制创建一个Fetcher线程来同步数据。需要对原始Kafka进行二次开发。字节跳动,滴滴uReplicator改进MM1,采用分布式任务管理框架ApacheHelix控制Partitions的分配,不需要fullrebalance。Uberbrooklin改进了MM1。实现思路和MM2类似。和uReplicator一样,为了减少rebalance,使用StickyAssignment来控制Partition的分配。除了支持Kafka集群之间的复制,还可以作为AzureEventHubs,AWSKinesisstreaming服务之间的通道,也可以作为CDCconnectorLinkedIn3.各方案主要设计要点对比分析3.1元数据同步元数据同步主要是指Topic、Partition、Configuration、ACL的同步,我们需要评估在添加主题、分区扩展、修改Configuration和ACL后是否可以自动感知到每个方案,以及是否选择了复制的主题评估方案是否灵活(比如是否支持白名单和黑名单机制,是否支持正则化),目标集群中的Topic名称是否变化(决定是否支持双向复制,是否发生循环复制)。在MM1方案中,选择复制的topic只支持白名单机制(通过--whitelist或--include参数指定),白名单支持正则写入。但是,当一个新主题被添加到源集群时,目标集群的auto.create.topics。当enable设置为true时,可以在目标集群中自动创建一个同名的topic(可以扩展messagehandler并重命名),否则必须重启MirrorMaker才能发现新添加的topic。关于目标集群上的topic分区数,MM1是基于默认值num.partitions配置的(其他方案没有这个问题),无法与源集群保持一致,无法同步ACL。与MM1相比,MM2弥补了上述不足。它主要依赖MirrorSourceConnector中的多个定时任务来实现这个功能。Topic/Partition、Configuration、ACL更新的时间间隔由三个参数指定,非常灵活。在MM2中,从版本3.0.0开始,支持两种复制策略。在默认的DefaultReplicationPolicy中,topic名称在目标集群复制后会发生变化,并且会在其前面加上源集群的前缀。为了兼容MM1,在3.0.0新增了IdentityReplicationPolicy,在目标集群复制后topic名称不会改变。ConfluentReplicator,根据官网描述,也有以上功能。原理和MM2类似,只是检测更新只由一个参数决定。Replicator可以定义复制后的topic名称,由参数topic.rename.format指定。默认值是保持主题名称不变。基于follower的同步机制的实现方式,网上资料不多,不详,但原理估计和MM2类似,复制后目标集群中topic名称不变。uReplicator的实现略有不同。复制哪些主题由参数enableAutoWhitelist和patternToExcludeTopics决定。当enableAutoWhitelist设置为true时,如果源集群和目标集群存在相同的topic,不需要其他设置就可以实现数据复制。如果设置为false,则需要将复制的topic名称等信息提交给控制分区分配的uReplicatorController。另外,黑名单参数patternToExcludeTopics控制哪些topic不需要复制;是否自动感知分区扩展由参数enableAutoTopicExpansion控制;关于Configuration和ACL无法同步。brooklin选择复制的topic只支持白名单机制,可以支持regularization。添加新主题和分区扩展后,可以自动感知。检测更新由参数partitionFetchIntervalMs决定。复制后,主题名可以加前缀,由参数DESTINATION_TOPIC_PFEFIX决定。总结如下:方案MM1MM2ConfluentReplicator是基于Follower的同步机制uReplicatorbrooklin。复制后,Topic名称将保持不变。它也可以定制并保持不变。也可以添加固定前缀保持不变。它也可以定制。变化,也可以定义一个前缀自动检测和复制新的topic部分支持(取决于目标集群是否开启自动创建topic)是否支持取决于二次开发功能不支持支持自动检测和复制新分区不支持supported支持依赖二次开发功能支持源集群和目标集群主题配置一致。不支持支持。支持支持。支持配置和ACL更新是否同步。不支持。:主题是否有白名单、黑名单和正则表达式部分支持支持的支持取决于二次开发的功能复制后消息偏移量是否对齐,是否能保证复制时的数据一致性,即是否会丢失数据或出现重复数据。首先说明一下,由于复制的延迟,所有这些容灾方案中的RPO都不等于0。基于follower的同步机制方案可以保持offset对齐。由于replica同步的延迟,当masterroom出现异常时,backuproom仍然可能会丢失一些数据,可以保持offset一致,不存在重复数据的可能。其他方案都不能保证偏移量对齐(除非复制时源主题的偏移量从0开始)。关于各个方案中消费者从源集群消费,再写入目标集群的逻辑,我们一一详细解释:首先从MM1开始,这是他的设计架构:在KIP-3MirrorMakerEnhancement中,上面-上面提到的架构设计,保证不丢数有以下几点:1.关闭消费者自动提交位移,在提交位移前调用生产者.flush()将缓存中的数据刷出2.在生产者端,通过设置这些参数max.in.flight.requests.per.connection=1(多个消费者共享一个生产者,这个生产者每次只向broker发送一个请求),retries=Int.MaxValue(return是可重试异常,无限次重试,直到buffer满),ack=-1(发送给所有副本)3.设置abortOnSendFail,当生产者收到不可重试异常后(比如消息等异常那太大了),停止MirrorMaker进程,否则会丢失一些发送失败的数据。另外,为了避免consumerrebalance时出现重复数据(rebalance时有些数据位移没有提交),定义一个新的consumerRebalancelistener,当partitionRevoke发生时,先刷新producer缓存中的数据,然后再提交位移。从上面的设计来看,MM1并没有丢失数据,但是仍然存在数据重复的可能,这是Kafka的非幂等Producer决定的。另外,MM1的设计存在很多缺陷,比如只有一个Producer,发送效率低。另外,这个Producer是轮询发送的,消息发送到的目标Topic上的分区不一定和源Topic上的分区相同。因为轮询,这个Producer会和集群中的每个broker建立连接。与uReplicator相比,位移也是在flush之后提交,避免数据丢失。改进了MM1的缺陷。每个WorkerInstance都有多个Fe??tcherThreads和多个ProducerThreads。从源集群中获取数据后,会将其放入队列中,ProducerThread从队列中取出数据发送到目标集群的topic,发送到目的topic的每条消息分区与源分区一致,可以保持语义一致性。在Brooklin中,可以在每个BrooklinInstance中设置多个Consumer和Producer,同时也可以保持语义的一致性。相对于uReplicator的一个优势是它提供了flushlessproducers(也可以提供flushProducers),这些消息只有发送成功后才会提交这些偏移量,因为调用Producer.flush()可以强制发送buffer中的数据,但代价高,发送线程会在缓冲区清空前被阻塞。consumer.poll()->producer.send(records)->producer.flush()->consumer.commit()优化为:consumer.poll()->producer.send(records)->consumer.commit(offsets)在MirrorMaker2中,使用KafkaConnect框架来拷贝数据。从源头消费数据后,存储在IdentityHashMap类型的内存结构outstandingMessages中。Producer发送到目的地成功后,会将消息从内存结构中删除。另外,从源端消费的进度会定时保存到KafkaTopic中。这种实现机制不会丢失数据,但是Producer发送成功后,进程在持久化之前进程异常挂掉,会产生重复消息。目前在KIP-656:MirrorMaker2Exactly-onceSemantics中提出了一种可以实现ExactlyOnlyOnce的方案。思路是在一次事务中提交消费位移和发送消息,但是相关的PatchKAFKA-10339还没有合并到主Branch中,最后一次更新停留在8月20日。根据ConfluentReplicator官网描述,复制不会丢失数据,但可能会重复。因此,和上面提到的MM2、uReplicator、brooklin一样,它提供了AtleastOnceDelivery消息传递语义。解决方案MM1MM2ConfluentReplicatorFollower-based同步机制uReplicatorbrooklin复制前后分区语义一致不支持支持支持支持偏移对齐不支持不支持不支持不支持不支持消息传递语义无数据丢失,可能重复AtleastOnce无数据丢失,可能至少重复一次将在未来无损地提供EOS语义。根据二次开发功能的不同,可能会重复AtleastOnce。从Kafka副本同步的原理来看,当参数设置合理时,副本间同步过程中数据可以保持一致而不丢失数据,可以重复至少一次而不丢失数据,可以重复至少一次3.3消费位移同步在容灾方案中,除了数据复制,消费位移的同步也很重要。容灾切换后,消费者能否在新集群恢复消费,取决于能否同步消费者offset。在MM1的设计中,如果要同步消费offset,只能将__consumer_offsets作为一个普通topic来同步。但是无法进行偏移量转换,因为源簇和目标簇的偏移量可能不对齐。在MM2设计中,解决了上述MM1问题。设计思路是在目标集群的checkpointtopic中定时记录消费位移,包括源端和目标端的提交位移。消息包括以下字段:消费者组id(String)消费者组主题(String)–包括源集群前缀主题名称分区(int)分区名称上游偏移量(int):源集群中最新提交的偏移量消费偏移量下游偏移量(int)):latestcommittedoffsettranslatedtotargetclustertargetclusterConsumptiondisplacementmetadata(String)partitionmetadatatimestamp此外,还设计了一个offsetsyncTopic来记录source和destinationoffset之间的映射。同时,MM2还提供了位移转换的MirrorClient接口://从特定的上游消费者组中查找最新的checkpoint对应的本地offsets.Map
