要点分布式消息系统支持两种类型的语义:流和队列。每一种都最适合特定类型的用例。ApachePulsar的独特之处在于它支持流和队列用例。Pulsar的多层架构允许用户比其他消息系统更容易地扩展主题的数量和大小。Pulsar在可扩展性、可靠性和功能之间提供了适当的平衡,以取代Iterable的RabbitMQ,并最终取代其他消息系统,如Kafka和AmazonSQS。在Iterable,我们每天代表客户发送大量营销信息。这些包括电子邮件、推送、SMS和应用内消息。Iterable每天还处理更多的用户更新、事件和自定义工作流状态,其中许多可以触发系统中的其他操作。这导致系统不仅对我们的客户非常有用,而且非常复杂。随着我们客户群的增长,管理这种复杂性变得更加重要。Iterable管理复杂性的一种方法是在其架构的多个部分使用分布式消息系统。分布式消息系统的主要目的是存储需要由消费者处理的消息,并在处理这些消息时跟踪这些消费者的状态。这样,消费者可以专注于处理每条消息的任务。Iterable使用工作队列方法来执行客户指定的营销工作流、webhook和其他类型的作业调度和处理。其他组件(例如用户和事件摄取)使用流模型来处理有序的消息流。一般来说,分布式消息系统支持两种类型的语义:流和队列。每一种都最适合特定类型的用例。流式传输和排队在流式消息传递系统中,生产者将数据附加到一组仅附加消息流中。在每个流中,消息必须按特定顺序处理,并且消费者标记它们在流中的位置。可以使用某种策略(例如散列用户ID)对消息进行分区以允许更大的并行性,每个分区充当单独的数据流。因为每个流中的数据是不可变的并且只存储偏移条目,所以消息可能不会被跳过。流式处理在消息顺序很重要的情况下效果很好,例如数据摄取。Kafka和AmazonKinesis是使用流语义来消费消息的消息传递系统的示例。在队列消息系统中,生产者将消息发送到可由多个消费者共享的队列。消费者在收到消息时处理消息,并在处理每条消息时向排队系统发送确认。因为多个消费者可能共享一个队列并且消息顺序并不重要,所以通常更容易扩展基于队列的系统的消费者端。排队系统非常适合不需要按特定顺序执行任务的工作队列——例如,向多个收件人发送电子邮件。RabbitMQ和AmazonSQS是流行的基于队列的消息传递系统的示例。队列系统通常包括简化处理消息级错误任务的功能。例如,在发生错误后,RabbitMQ可以轻松地将一条消息传输到一个特殊的队列中,将其保留在那里指定的时间,然后返回到原始队列中进行重试。它还可以否定确认一条消息,以便在失败后可以重新发送。由于大多数消息队列在确认消息后通常不会将消息存储在积压中,因此调试和灾难恢复更加困难,因为没有消息可供检查。Kafka等基于流的系统可用于排队用例,但有一些注意事项。事实上,许多用户选择此选项是因为这些系统通常提供出色的性能。然而,这种解决方案可能是一个挑战,因为它给开发人员带来了不应有的负担来处理严格的流排序所施加的限制。如果某个消费者消费消息速度较慢,或者在临时失败后需要重试处理,则可能会延迟同一流上其他消息的处理。一个常见的解决方案是通过将消息重新发布到另一个主题来重试处理,但这会带来复杂性,因为应用程序逻辑必须管理额外的状态。为什么Iterable需要一个新的消息传递平台我们一直大量使用RabbitMQ并依赖其功能来处理内部消息传递。我们大量使用生存时间(TTL)值,不仅用于固定长度的重试,而且还用于在消息处理中实现显式延迟。例如,我们可能会延迟发送营销电子邮件,以便在每个收件人最有可能打开它们的时候将营销信息发送给他们。我们还依赖否定确认来重试排队的消息。这是我们架构的简化版本:当我们开始评估Pulsar时,上面提到的所有队列都在RabbitMQ上,除了使用Kafka的摄取。Kafka非常适合摄取,因为它提供了必要的性能和顺序保证。Kafka不适合其他用例,因为它缺少必要的工作队列语义。事实上,我们使用了许多RabbitMQ特定的功能,例如延迟,这也使得寻找替代方案更具挑战性。当我们扩展我们的系统时,RabbitMQ开始受到以下限制:在高负载下,RabbitMQ经常遇到流量控制问题。流量控制是一种机制,用于在消息代理跟不上(通常是由于内存和其他资源限制)时减慢发布者的速度。这阻碍了生产者的发布能力,导致其他域的服务延迟和请求失败。具体来说,我们注意到当大量消息的TTL同时过期时,流量控制会更频繁地发生。在这些情况下,RabbitMQ会尝试一次性将过期消息传送到它们的目标队列。这超出了RabbitMQ实例的内存容量,触发了正常生产者的流量控制机制,阻止了他们的发布尝试。调试变得更加困难,因为RabbitMQ的代理在确认后不存储消息。换句话说,无法为消息设置保留时间。复制很难实现,因为RabbitMQ中的复制组件对于我们的用例来说不够健壮,导致RabbitMQ成为我们消息状态的单点故障。RabbitMQ难以处理大型队列。由于我们有很多需要专用队列的用例,我们通常一次需要超过10,000个队列。在这个级别,RabbitMQ遇到性能问题,通常首先出现在管理接口和API中。评估ApachePulsar总的来说,ApachePulsar似乎提供了我们需要的所有功能。虽然我们看到很多关于Pulsar的炒作将其与Kafka相比较以处理流式工作负载,但我们也发现Pulsar非常适合我们的队列需求。Pulsar的共享订阅功能允许将主题用作队列,可能为同一主题的不同订阅者提供多个虚拟队列。Pulsar原生支持延迟和计划的消息,尽管在我们开始考虑Pulsar时这些功能是非常新的。除了提供丰富的功能集外,Pulsar的多层架构使我们能够比其他消息系统更轻松地扩展主题的数量和大小。Pulsar的顶层由接受来自生产者的消息并将其发送给消费者的代理组成,但不存储数据。单个代理处理每个主题分区,但代理可以轻松交换主题所有权,因为它们不存储主题状态。这使得添加代理以增加吞吐量和立即利用新代理变得容易。这也使Pulsar能够处理代理故障。Pulsar的底层BookKeeper将主题数据存储在段中,这些段分布在整个集群中。如果需要额外的存储,我们可以轻松地将BookKeeper节点(bookies)添加到集群中,并使用它们来存储新的主题片段。经纪人与博彩公司协调,在每个主题发生变化时更新其状态。Pulsar将BookKeeper用于主题数据也有助于它支持大量主题,这对于Iterable当前的许多用例至关重要。在评估了多个消息系统之后,我们认为Pulsar在可扩展性、可靠性和功能之间提供了适当的平衡,可以取代Iterable的RabbitMQ,并最终取代其他消息系统,如Kafka和AmazonSQS。第一个Pulsar用例:消息传递Iterable平台最重要的功能之一是代表Iterable的客户安排和发送营销电子邮件。为此,我们将消息发布到特定于客户端的队列,然后让另一个服务处理消息的最终呈现和发送。这些队列是我们决定从RabbitMQ迁移到Pulsar的第一件事。我们选择营销消息作为我们的第一个Pulsar用例有两个原因。首先,因为发送包含一些更复杂的RabbitMQ用例。其次,因为它代表了我们RabbitMQ使用的很大一部分。这不是风险最低的用例;然而,经过广泛的性能和可扩展性测试,我们相信这是Pulsar可以增加最大价值的地方。以下是在Iterable平台上创建的三种常见活动类型:同时向所有收件人发送营销信息的爆炸式活动。假设客户想要向过去一个月活跃的用户发送电子邮件简报。在这种情况下,我们可以在计划活动时向ElasticSearch查询用户列表,并将它们发布到该客户的Pulsar主题。为每个收件人提供自定义发送时间的Blast活动。发送时间可以是固定的(例如,“收件人当地时区上午9点”),也可以通过我们的发送时间优化功能计算。在每种情况下,我们都希望将排队消息的处理延迟到指定时间。用户触发的活动。这些可以由自定义工作流或用户发起的交易(例如在线购买)触发。用户触发的营销发送是根据需要单独完成的。在上述每种情况下,在任何给定时间执行的发送数量可能会有很大差异,因此我们还需要能够上下扩展消费者以适应不断变化的负载。迁移到ApachePulsar尽管Pulsar在负载测试中表现良好,但我们不确定它是否能够在生产环境中维持高负载水平。这是一个特别值得关注的问题,因为我们计划利用Pulsar的几个新功能,包括否定确认和计划消息传递。为了建立我们的信心,我们实现了一个并行管道,我们将消息发布到RabbitMQ和Pulsar;在这种情况下,我们在这些主题上设置消费者以确认排队的消息而不实际处理它们。我们还模拟消费延迟。这有助于我们了解Pulsar在特定生产环境中的行为方式。我们正在为测试和实际生产主题使用客户级功能标志,因此我们可以为测试和最终生产使用一个接一个地迁移客户。在测试过程中,我们发现了Pulsar中的一些错误。例如,我们发现了与延迟消息相关的竞争条件,Pulsar开发人员帮助识别并修复了该竞争条件。这是我们发现的最严重的问题,因为它导致消费者卡住,造成未消费消息的积压。我们还注意到一些与Pulsar消息批处理相关的有趣问题,它在Pulsar生产者中默认启用。例如,我们注意到Pulsar的积压指标报告的是批处理计数而不是实际消息计数,这使得为消息积压设置警报阈值更具挑战性。后来我们在否定确认和批处理之间的交互中发现了一个更严重的错误,最近已修复。最终,我们认为批处理不值得麻烦。幸运的是,在Pulsar生产者中禁用批处理很容易,没有批处理的性能足以满足我们的需求。这些问题也可能在即将发布的版本中得到修复。延迟确认和否定确认在当时是相对较新的功能,因此我们预计我们可能会发现一些问题。这就是为什么我们选择在几个月内慢慢迁移到Pulsar,最初仅针对测试主题发布,然后逐渐迁移到真实发送。这种方法使我们能够在问题成为客户的问题之前发现问题。虽然花了大约六个月的时间才完全确信Pulsar正在按预期工作,但结果是值得的。在大约六个月内,我们将整个营销发送操作迁移到了Pulsar。迁移完成后,我们发现Pulsar将我们的运营成本降低了近一半,并且随着我们增加新客户而有增长空间。成本降低非常显着,部分原因是我们的RabbitMQ实例被过度配置以补偿性能问题。到目前为止,我们的Pulsar集群已经顺利运行了6个多月,没有出现任何问题。实现和工具Iterable主要在后端使用Scala,因此为Pulsar提供良好的Scala工具对我们来说很重要。我们使用了优秀的pulsar4s库,并为支持延迟消息等新功能做出了很多贡献。我们还提供了一个基于AkkaStreams的连接器,用于将消息用作具有单独确认支持的源。例如,我们可以像这样消费命名空间中的所有主题://在此命名空间中的所有主题上创建一个消费者valcreateConsumer=()=>client.consumer(ConsumerConfig(topicPattern="persistent://email/project-123/.*".r,subscription=Subscription("email-service")))//为此消费者创建一个Akka流`Source`阶段pulsarSource=committableSource(createConsumer,Some(MessageId.earliest))//实现源和取回一个control以稍后将其关闭。valcontrol=pulsarSource.mapAsync(parallelism)(handleMessage).to(Sink.ignore).run()我们喜欢为消费者使用正则表达式订阅。它们使在创建新主题时自动订阅变得容易,并使消费者不必了解特定的主题划分策略。同时,我们也在利用Pulsar支持大量主题的能力。由于Pulsar在发布时会自动创建新主题,因此为新消息类型甚至单个事件创建新主题非常简单。这也使得对不同的客户端和消息类型实施速率限制变得更加容易。我们学到了什么由于Pulsar是一个快速发展的开源项目,我们遇到了挑战——主要是在跟上速度和学习它的怪癖方面——我们在其他更成熟的技术中可能看不到这些挑战。文档并不总是完整的,我们经常需要向社区寻求帮助。也就是说,社区非常热情和乐于助人,我们很高兴能更多地参与Pulsar的开发并参与有关新功能的讨论。Pulsar的独特之处在于它支持流和队列用例,同时还支持广泛的功能集,使其成为我们架构中当前使用的许多其他分布式消息传递技术的可行替代方案。Pulsar涵盖了我们所有的Kafka、RabbitMQ和SQS用例。这使我们能够专注于围绕单个统一系统构建专业知识和工具。自2019年初开始使用Pulsar以来,我们对Pulsar的发展进步感到鼓舞,尤其是在初学者入门门槛方面。工具得到了很大改进:例如,PulsarManager现在提供了一个非常方便的GUI来管理集群。我们还看到许多公司提供托管和托管的Pulsar服务,这使得初创公司和小型团队更容易上手使用Pulsar。总的来说,从Iterable到Pulsar的转变很有趣,有时也很有挑战性,但到目前为止相当成功。在许多方面,我们的用例代表了一条尚未被广泛采用的新路径。我们确实预计会遇到一些问题,但我们的测试过程有助于最大限度地减少它们对客户的影响。我们现在对使用Pulsar充满信心,并将继续将Pulsar的使用扩展到Iterable平台中的其他现有组件和新组件。
