当前位置: 首页 > 科技观察

马蜂窝消息总线——面向业务的消息服务设计

时间:2023-03-21 01:53:17 科技观察

为什么需要消息总线?在消息总线上线之前,马蜂窝业务的异步需求大部分是通过Redis队列来实现的。随着消息量的增加,经常会出现消息积压和不同消息之间相互影响的问题。为了解决这些问题,电商研发团队开始规划设计消息总线。为什么会有消息总线,而不是让业务系统直接使用PHP或者其他语言去连接RabbitMQ、Kafka等消息系统呢?“消息总线和直接使用消息系列的实际区别是什么?”太懂事的地方。如果只是将Redis换成性能更好的消息系统,消息总线的作用实在是大可不必。但是,当我们从实际业务的角度梳理公司整体的技术架构和开发场景时,发现将业务系统与消息系统直接对接并不是一个好的方式,至少会面临以下问题:系统是去中心化的。每个开发团队都需要维护自己的消息服务,彼此之间相对隔离。增加开发难度。用户需要注意具体消息所在消息服务的配置,不同业务的消息可能需要接入不同类型的消息系统。维护成本高。用户需要管理自己消费服务的稳定性,处理各种服务异常,保证消费的可靠性。特别是对于PHP,这个成本还是比较高的。管理困难。无法统一管理业务消息的创建和订阅关系,也不方便管理业务消息中敏感数据的权限。不易扩展。无法使用统一消息系统的扩展功能(路由、延迟、重试、消费确认等)。一般来说,直接使用消息系统可以算是一种面向技术的接入方式;而消息总线希望通过隐藏部署、分组、通信等细节,实现面向业务的访问方式。架构设计与技术实现1.架构设计消息总线隐藏了消息发送、路由、分组、存储、消费负载、通信、高可用等一系列问题。对于用户来说,只需要在发送端调用一个SDK消息发送方法,在消费端提供一个PHP消费方法即可。图1马蜂窝消息总线架构设计马蜂窝消息总线目前采用RabbitMQ作为消息引擎,并在发送端提供SDK,作为消息总线的Broker角色,包括消息路由和分组的功能,并负责消息的发布消息。消息的订阅关系目前持久化在MySQL中。消息发送时,会根据订阅关系将消息投递给对应的业务消费者。在消费端,不直接使用PHP访问RabbitMQ,而是使用Deliver服务集群(Golang服务)将AMQP协议转换为HTTP协议,然后通过PHPService执行PHP代码进行消费。本方案设计时,还考虑了未来系统规模扩展后的消息分组和关键环节的可替代性。SDK作为消息服务Broker,可以控制消息的路由和分组。以后在微服务体系中可以保持整体架构不变,只采用其他方案来实现Broker。可以根据业务场景对接不同的消息引擎。比如RabbitMQ可以用于对业务一致性要求高的业务,而Kafka可以用于对并发要求高的业务。对业务是无感知的。Deliver和ApplicationService之间可以扩展更多的通信协议,以支持更灵活的应用消费方式,包括支持未来微服务中的消费服务。2.技术实现1).降低流的复杂度为了保证消息能够被统一处理,同时降低消息总线中各个环节流的复杂度,将消息体设计为统一的结构。主要分为以下三部分:图2消息体参数定义——参数部分包括消息ID、来源、时间等参数。Conetent——消息的内容。在PHP中,用户可以将消费方法的输入参数放入Content中。Receiver——标记消息的接收者(PHP中consumer的方法)。2).在线服务异步点对点模式是业务中常用的一种异步模式。图3点对点消息模式业务应用把同步请求中不需要执行的逻辑放到异步执行中。发送消息的业务需要显式处理消息的接收者(消费的PHP方法)。发送消息时,需要指定一个唯一的Receiver。目前消息总线SDK提供的invoke方法可以指定消费申请方法。3).消息解耦的发布订阅模式是一种标准的业务解耦模式。图4发布和订阅(广播)App1的应用程序只负责发送消息。至于需要关注什么业务,下游业务应用自己订阅消息即可。大大降低了上下游业务的耦合度和开发调试成本。消息总线使用DB来存储消息订阅关系。上游业务消息在经过消息总线Broker时,会根据订阅关系拆分为订阅应用的Receiver的多条消息。这种消息裂变的方式使得消息在消息总线上传输时明确了消息的目标,在消费负载、消费确认、失败重试等场景下可以根据Receiver进行隔离。同样,调用者可以使用SDK提供的pub方法发送消息,订阅者可以通过消息管理系统申请消息订阅。4).抗消息干扰很多使用消息总线的同学比较关心的是不同的消息会不会互相干扰,比如某个消息在短时间内大量涌入会不会导致其他消息被阻塞。通过前面架构的介绍,可以看到所有的消息在经过Broker的时候都可以进行路由和分组。未来消息总线会根据业务和消息量做一些物理上的隔离,保证业务之间不会相互影响。在一个组中,消息总线也有一些机制来保证组中的不同消息不会相互影响。图5抗消息干扰机制消息经过Broker后默认会进入一个OnlineQueue队列,Deliver集群中会有多个Deliver监听OnlineQueue。在Deliver服务中,Dispatcher用于控制消息的总并发消费量和同类型消息的并发消费量。当某类消息的并发数超过阈值时,会被转发到OfflineQueue,避免消耗Worker被同类消息占用。OfflineQueue将被独立的Deliver服务监控和消费,而不会影响OnlineQueue的消费。5).消费服务的高可用为了保证消费的高可用,Deliverer组除了负责消费协议的转换,还制定了一些策略来保证消费的高可用。当一定时间内消息失败次数超过阈值时,熔断器将停止消费队列,避免服务抖动和上线失败导致的大规模消息。消费失败后,Deliver服务会监控后端应用服务的健康状况,服务恢复后会自动恢复消费。系统故障重试消息当总线服务出现故障时,可以使用重试策略对期间失败的消息进行重试,避免基础服务问题导致的消费失败。业务失败重试在消费业务应用时会产生业务异常,订阅消息时可以指定是否重试。消息总线会按照一定的时间段对需要失败的消息进行多次重试。GracefulRestartDeliver实现了Gracefulrestartandexit,保证当前消费的消息全部处理完后,进程才会退出。未来规划图7未来演进方向1.产品化目前消息总线的功能经过近一年的迭代已经基本稳定。但信息管理、监控、统计等环节对开发者不够友好。在接下来的一段时间里,我们将重点优化系统的易用性。目前正在规划以下几个方向的优化和改进:开发者可以通过消息管理系统进行添加新消息、订阅消息(审核权限)等操作,而不是目前手动提交issue的方式。开发者可以通过系统关注自己消息的消费情况,及时收到消息处理异常的告警。完善监控系统,提供更详细的系统监控数据。2.微服务还计划在微服务架构内提供消息总线服务。包括在微服务中发送消息和使用微服务消费消息。未来整个消息总线方案会演进到下图所示的架构,增加对多语言、不同架构服务的支持。适应更多业务发展场景,提供更稳定友好的消息总线服务。另外,对于消息引擎的技术选型,未来还会考虑接入Kafka、RocketMQ等消息队列服务。根据不同业务场景的消息特点,在发布时选择进入不同的消息队列服务。比如对可靠性和数据安全性要求高的消息会进入RabbitMQ,而吞吐量大的消息可以进入Kafka。但是消息的发送者和订阅者都不需要关心这些细节,仍然可以统一访问。马蜂窝消息总线服务目前还在迭代中,很多地方还有很多问题没有考虑到。欢迎大家多提宝贵意见。您可以扫描下方二维码订阅“马蜂窝科技”获取更多内容。本文作者:梁亮,马蜂窝电商研发团队技术专家。2004年毕业于西安邮电大学,曾就职于新浪、开心网、阿里巴巴等公司。先后从事搜索、社交、视频、电商等方向的研发工作。2017年加入马蜂窝,目前负责马蜂窝电商平台服务系统开发。【本文为专栏作者马蜂窝科技原创文章,作者微信公众号马蜂窝科技(ID:mfwtech)】点此查看作者更多好文

猜你喜欢