当前位置: 首页 > 科技观察

携程基于BookKeeper的延迟消息架构实现

时间:2023-03-20 16:20:55 科技观察

本文作者magiccao和littleorca来自携程消息队列团队。目前主要从事消息中间件的开发和弹性架构的演进,同时关注网络/性能优化、应用监控和云原生等领域。一、背景QMQ延迟消息是一套以服务的形式独立存在的解决方案,不局限于消息厂商的实现。其架构如下图所示。QMQ延迟消息服务架构延迟消息从生产者投递到延迟服务后,会累积在服务器的本地磁盘中。当延迟消息调度时间到期时,将延迟服务转发给实时Broker,供消费者消费。延时服务采用主从架构,其中Zone代表可用区(一般可以理解为IDC)。为了保证单个可用区故障后历史投递消息的正常调度,会跨可用区部署master和slave。1.1痛点该架构主要存在以下问题:服务有状态,无法弹性伸缩;主节点故障后,需要进行主从切换(自动或手动);缺乏一致性协调器来确保数据的一致性。如果将消息的业务层和存储层分开,则各自协同演化发展,各自专注于擅长的领域。这样消息业务层可以做到无状态,轻松完成容器化改造,具备弹性伸缩的能力;存储层引入分布式文件存储服务,存储服务保证高可用和数据一致性。1.2分布式文件存储的选择对于存储服务的选择,除了高可用和数据一致性的基本特征外,还有一个很关键的点:高容错性和低运维成本。分布式系统最大的特点自然是它对部分节点故障的容忍度。毕竟,任何硬件或软件故障都是不可避免的。所以高容错和低运维成本会成为我们选型的重中之重。Pulsar于2016年由雅虎开源贡献给Apache,以其云原生、低延迟分布式消息队列和流处理平台等标签在开源社区引起了轰动和流行。对其进行相关研究后发现,Pulsar恰好是一个将消息服务与存储分离的架构,而存储层是另一个Apache开源基金会BookKeeper。2.BookKeeper作为一种可扩展、高容错、低延迟的分布式强一致性存储服务,BookKeeper已经被一些公司部署在生产环境中。最佳实践案例包括替换HDFSnamenode和Pulsar的消息存储和消费进度Persistence和对象存储。2.1基本架构BookKeeper基本架构Zookeeper集群用于存储节点发现和元信息存储,提供强一致性保证;Bookie存储节点,提供数据存储服务。在写入和读取过程中,Bookie节点之间不需要相互通信。Bookie启动时,会在Zookeeper集群中注册自己,暴露服务;Client是一种胖客户端类型,负责与Zookeeper集群和BookKeeper集群直接通信,根据元信息完成多副本的写入,保证数据可以重复读取。2.2基本特征a)基本概念Entry:数据载体的基本单元Ledger:条目集合的抽象,类似于文件Bookie:账本集合的抽象,物理存储节点Ensemble:账本bookie集合b)数据读写BookKeeper数据读写bookieclient端通过创建持有一个账本后,可以进行条目写入操作,条目以带状方式分布在enemble的bookie中。条目在客户端进行编号,每个条目会根据设置的份数(Qw)判断写入是否成功;bookie客户端通过打开一个已创建的账本来读取条目,条目的读取顺序与写入输入一致,默认从第一个副本开始读取。如果读取失败,将依次从下一个副本开始重试。c)数据一致性持有可写账本的bookie客户端称为Writer。分布式锁机制保证一个账本全局只有一个Writer。Writer的唯一性保证了数据写入的一致性。Writer内存中维护了一个LAC(LastAddConfirmed),当满足Qw要求时更新LAC。LAC与下一个请求或时间一起保存在bookie副本中。当账本关闭时,它被持久化在元数据存储(zookeeper或etcd)中;持有可读账本的bookie客户端称为reader,一个账本可以有多个Reader。LAC的强一致性保证了不同Reader看到的是统一的数据视图,可以重复读取,从而保证了数据读取的一致性。d)容错的典型故障场景:Writer崩溃或重启,Bookie崩溃。Writer故障,ledger可能没有关闭,导致LAC未知。通过账本恢复机制,关闭账本,修复LAC;Bookie故障,条目写入失败。通过集成替换机制,将新的入口路由信息更新到MetadataStore,保证新数据能够及时成功写入。历史数据通过bookierecover机制,满足Qw副本的要求,巩固了历史数据读取的可靠性。至于副本所在的所有bookie节点的故障场景,只能等待修复。e)将新扩展的bookie负载均衡到集群中。当一个新的账本被创建时,流量会自动平衡。2.3上海大区(region)存在多个可用区(az,availablezone)用于同城多中心容灾,每两个可用区或两个可用区的网络延迟小于2ms。在这种网络架构下,多个副本分散在不同的az可接受的高可用性解决方案中。BookKeeper的zone-awareensemblereplacementstrategy就是针对这种场景的解决方案。基于Zone-aware策略的同城多中心容灾启用Zone-aware策略有两个限制条件:a)E%Qw==0;b)Qw>minNumOfZones。其中E表示合奏的大小,Qw表示副本数,minNumOfZones表示合奏中的最小区域数。例如下面的例子:minNumOfZones=2desiredNumZones=3E=6Qw=3[z1,z2,z3,z1,z2,z3]故障前每条数据三份,分布在三个可用区;当z1失败时,将生成满足minNumOfZones约束的新系综:[z1,z2,z3,z1,z2,z3]->[z3,z2,z3,z3,z2,z3]。显然,三副本的每份数据仍然会分布在两个可用区中,仍然可以容忍一个可用区故障。DNSResolver客户端选择bookie组成ensemble时,需要通过IP解密对应的zone信息,用户需要自己实现一个resolver。考虑到zone之间的网段被认为是规划好的,不会重叠,所以我们在落地的时候,简单实现了一个子网解析器,可以动态配置生效。该示例显示了精确IP匹配的实现。publicclassConfigurableDNSToSwitchMappingextendsAbstractDNSToSwitchMapping{privatefinalMapmappings=Maps.newHashMap();publicConfigurableDNSToSwitchMapping(){super();mappings.put("192.168.0.1","/z1/192.168.0.1");///zone/升级域mappings.put("192.168.1.1","/z2/192.168.1.1");mappings.put("192.168.2.1","/z3/192.168.2.1");}@OverridepublicbooleanuseHostName(){returnfalse;}@OverridepublicListresolve(Listnames){ListrNames=Lists.newArrayList();names.forEach(name->{StringrName=mappings.getOrDefault(name,"/default-zone/default-upgradedomain");rNames.add(rName);});返回rNames;}}ConfigurableDNSresolversampledatareplicas由于某种原因分布在单个可用区中(例如,可用区故障演练)当只有一个可用区可用时,新写入的数据的所有副本将落在单个可用区中。当故障的可用区恢复后,一些历史数据仍然只存在于单个可用区中。不满足多可用区容灾的高可用需求。AutoRecovery机制中有PlacementPolicy检测机制,但缺少恢复机制。所以我们做了一个补丁来支持动态机制来打开和关闭这个功能。这样当可用区恢复失败时,可以自动发现并修复所有数据副本分布在一个可用区内,影响数据可用性的问题。3、灵活架构引入BookKeeper后,延迟消息服务的架构比较美观。消息业务层和存储层完全分离,延迟消息服务本身是无状态的,易于扩展。当可用区发生故障时,不再需要进行主从切换。延迟消息服务新架构3.1无状态改造存储层分离后,可以实现业务层的无状态。要实现这一目标,还需要解决一些问题。我们先来看一下使用BookKeeper的一些限制:BookKeeper不支持共享写入,即业务层多个节点写入数据,必须写入不同的账本;BookKeeper虽然允许多读,但是如果多个应用节点分别读,进度是相互独立的,应用必须自己解决进度协调问题。以上两个主要问题决定了我们在实现无状态弹性扩缩容时,必须自己解决读写资源分配的问题。为此,我们引入了任务协调器。我们首先管理分片中的存储资源。每个shard都支持读写操作,但是只有一个业务层节点可以同时读写。如果我们把分片看成资源,把业务层节点看成worker,那么taskcoordinator的主要职责就是:以sticky-priority的方式尽可能均匀地将资源分配给worker;监控资源,如果工人增加或减少,则重新执行职责1;当资源不够时,根据具体的策略配置添加和初始化新的资源。因为是分布式环境,协调器在完成上述职责的时候需要保证分布式的一致性,当然也要满足可用性的要求。我们选择了基于ZooKeeper的master-multiple-slave架构来进行master的选择。如图所示,协调器点对点部署在业务层的应用节点中。在运行时,协调器通过基于ZooKeeper的领导者选举机制确定领导者节点,领导者节点负责上述任务分配。协调器选举的实现可以参考ZooKeeper官方文档,这里不再赘述。3.2持久化数据原有架构,按照调度时间每隔10分钟将延迟消息存储在本地桶中,将接近时间的桶加载到内存中,使用HashedWheelTimer进行调度。这种设计有两个缺点:buckets较多(我们支持2年范围内的延迟,理论bucket数在10万以上);按调度时间排序,可能出现卸载部分包含调度时间较早的数据,加载时已经滞后。缺点1、单机100000+个本地文件问题不大,但经过改造后,这些bucket信息以元信息的形式存储在ZooKeeper上。我们的实施方案确定每个bucket至少占用3个ZooKeeper节点。假设我们要部署5个集群,每个集群平均有10个shard,每个shard有10万个bucket,那么使用的ZooKeeper节点数是1500万,这对于ZooKeeper来说是无法承受的。缺点2无论是新旧架构,都是一个潜在的问题。一旦某10分钟内消息多了,消息可能会延迟。加载到内存时,应该有更细的粒度。基于以上问题的分析,我们参考多级时间轮调度的思想稍作改动,设计了一套基于滑动时间分桶的多级调度方案。如上表所示,最大的桶是1周,其次是1天、1小时和1分钟。每一级覆盖的时间范围不同,合并覆盖2年的时间范围,理论上只需要286桶,比起原来的10万多桶,有了质的减少。同时,只有L0m级别的调度器需要加载数据到HashedWheelTimer,加载粒度降低到1分钟,大大减少了一个bucket加载不满带来的调度延迟。多级调度器以类似串联的方式协同工作。当每一级调度器收到写请求时,首先尝试将其委??托给其上级(更大粒度)的调度器进行处理。如果上级接受了,只需要返回上级的处理结果;如果上级不接受,则判断是否属于当前级别,如果是,则写入桶中,否则发回给下级。每一级调度器都会打开时间接近的桶,并将其中的数据发送给下一级调度器。比如L1h发现最小的bucket已经到了preloading时间,然后读取bucket的数据发送给L0m调度器,最后将小时的数据传给L0m并扩充为(最多)60分钟级桶。4.未来规划目前,bookie集群部署在物理机上。新建集群和扩容缩容相对麻烦。未来会考虑集成到k8s系统中;还需要考虑bookie治理和平台化;我们目前只有同城多中心容灾能力,跨地域容灾、公私混合云容灾等高可用架构还需要进一步加强。