消息队列逐渐成为企业应用系统内部通信的核心手段。它具有低耦合、可靠传递、广播、流量控制、最终一致性等一系列功能。图片来自Pexels。目前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,Redis、MySQL、PhxSQL等一些数据库也可以实现消息队列的功能。消息队列概述消息队列是指使用一种高效、可靠的消息传递机制进行与平台无关的数据交换,以及基于数据通信的分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量调峰、异步通信、数据同步等功能。作为分布式系统架构的一个重要组成部分,起着举足轻重的作用。消息队列的特点是采用异步处理方式。消息发送者可以发送消息而无需等待响应。消息发送者将消息发送到虚拟通道(主题或队列),消息接收者订阅或监听该通道。一条消息最终可能会被转发给一个或多个消息接收者,他们都不需要同步响应消息发送者。整个过程是异步的。应用系统之间的解耦主要体现在以下两点:发送方和接收方不需要相互认识,只需要确认消息即可。发送者和接收者不必同时在线。例如,为了保证数据的最终一致性,在线交易系统会在支付系统处理后将支付结果放入消息中间件,通知订单系统修改订单支付状态。两个系统通过消息中间件解耦。消息队列的投递服务模型消息队列的投递服务模型如下图所示:消息队列的传输方式点对点模型点对点模型用于点对点消息生产者和消息消费者之间的点通信。消息生产者将消息发送给由名称标识的特定消费者。这个名字实际上指的是消费者服务中的一个队列(Queue),在消息传递给消费者之前就存储在这个队列中。队列消息可以放在内存中,也可以持久化,以保证在消息服务失败时,消息仍然可以传递。传统的点对点消息中间件通常由消息队列服务、消息传递服务、消息队列和消息应用程序编程接口API组成。其典型结构如下图所示:特点如下:每条消息只使用一个消费者。发送者和接收者之间没有时间依赖性。接收方确认消息已成功接收和处理。示意图如下:发布/订阅模型(Pub/Sub)Publisher/Subscriber模型支持向特定的消息主题生产消息。0个或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这个模型中,发布者和订阅者彼此不认识,就像一个匿名的公告板。这种模式概括为:多个消费者可以获得消息,发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(Subscription),这样消费者才能进行订阅。订阅者必须保持持续活跃并接收消息。在这种情况下,当订阅者未连接时,当订阅者重新连接时,发布的消息将被重新发布,如下图所示:特点如下:每个消息可以有多个订阅者。客户端只有订阅后才能接收消息。持久和非持久订阅。注意以下三点:发布者和订阅者有时间依赖性:接收者和发布者只有建立订阅关系才能接收消息。持久订阅:订阅关系建立后,无论订阅者是否在线,消息都不会消失。非持久订阅:订阅者必须始终在线才能接收消息。当只有一个用户时,近似等于点对点方式。消息队列应用场景当你需要使用消息队列时,首先需要考虑它的必要性。可以使用消息队列的场景有很多,最常用的有应用松耦合、异步处理模式、发布订阅、最终一致性、非高峰流量控制、日志缓冲等。反之,如果需要强一致性,关注业务逻辑的处理结果,使用RPC更合适。异步处理非核心进程是异步的,减少系统响应时间,提高吞吐量。例如:短信通知、终端状态推送、App推送、用户注册等。消息队列一般都内置了高效的通信机制,所以也可以用于简单的消息通信,比如点对点的消息队列或者聊天室。应用案例:网站用户注册,注册成功后,稍后会发送确认邮件或短信。系统解耦系统不是强耦合的,可以随意添加消息接收者,无需修改消息发送者的代码。消息发送者的成功不依赖于消息接收者(例如:一些银行接口不稳定,但调用者不需要依赖这些接口)。不强依赖系统的非核心进程。对于非核心进程,可以放在消息队列中供消息消费者按需消费,不影响核心主进程。FinalConsistency最终一致性不是消息队列必须具备的特性,但是确实可以依赖消息队列来做最终一致性:先写消息再操作,保证操作完成后修改消息状态。定时任务补偿机制实现了消息的可靠发送和接收,以及业务操作的可靠执行。需要注意消息重复和幂等设计。所有不保证100%消息丢失的消息队列,理论上都无法实现最终一致性。像Kafka这样的设计,在设计层面就存在消息丢失的可能(比如磁盘定时刷新,掉电就会丢失消息)。即使只有千分之一的消息丢失,业务也必须使用其他手段来保证正确的结果。广播生产者/消费者模型只需要关心消息是否投递到队列中。至于谁要订阅,谁需要消费,是下游的事情,这无疑大大减少了开发联调的工作量。流量调峰与限流当上下游系统的处理能力存在差距时,利用消息队列做一个通用的“漏斗”,进行限流控制。当下游有能力处理时,再进行分发。例如:用户在支付系统结账成功后,订单系统会通过短信系统向用户推送扣款通知。短信系统的速度可能会因为短板效应(每秒几百个请求)卡在网关上,跟前端并发不是一个数量级。从而造成支付系统和短信系统的处理能力差异化。但是,如果用户在晚上半分钟左右收到一条短信,一般不会有什么大问题。如果没有消息队列,两个系统之间实现协商、滑动窗口等复杂的解决方案也不是没有可能。但是系统的复杂度呈指数增长,存储必须做上游或者下游,还要处理时序、拥塞等一系列问题。而每当处理能力出现差距时,就需要单独开发一套逻辑来维护这个逻辑。因此,使用中间系统转储两个系统的通信内容,然后在下游系统有能力处理这些消息时,再处理这些消息是一种比较常见的方式。应用案例:消息队列作为可靠的消息暂存处,进行一定程度的消息堆积。定期进行消息投递,如模拟用户秒杀访问、系统性能压力测试等。日志处理在日志处理中使用消息队列,比如Kafka的应用,解决海量日志传输和缓冲的问题。应用案例:集中收集日志,用于PV计算、用户行为分析等。点消息队列或聊天室。消息队列的推拉模型推送消息模型消息生产者向消息队列发送消息,消息队列将消息推送给消息消费者。Pull消息模型消费者请求消息队列接受消息,消息生产者从消息队列中拉取消息。两种类型的区别两种类型的区别如下:消息队列技术比较这部分主要介绍四种常用的消息队列(ActiveMQ/RabbitMQ/RocketMQ/Kafka)的主要特点和优缺点。ActiveMQActiveMQ是由Apache出品,ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现。它速度非常快,支持多种语言的客户端和协议,可以很容易地嵌入到企业应用环境中,并具有许多高级功能。主要特点:符合JMS规范:JMS规范提供了良好的标准和保证,包括:同步或异步消息分发、一次性和一次性消息分发、消息接收和订阅等。符合JMS规范的好处是无论使用何种JMS实现提供者,这些基本功能都是可用的。连接灵活性:ActiveMQ提供了广泛的连接协议,支持的协议有:HTTP/S、IP多播、SSL、TCP、UDP等。对许多协议的支持为ActiveMQ提供了极大的灵活性。支持的协议类型很多:OpenWire、STOMP、REST、XMPP、AMQP。持久性插件和安全性插件:ActiveMQ提供了多种持久性选项。而且,ActiveMQ的安全性还可以根据用户的需要进行全面的认证和授权定制。支持的客户端语言种类很多:除了Java,还有C/C++、.Net、Perl、PHP、Python、Ruby。BrokerCluster:多个ActiveMQbroker可以组成一个集群来提供服务。极其简单的管理:ActiveMQ的设计考虑到了开发人员。因此,它不需要专门的管理员,因为它提供了简单实用的管理功能。在ActiveMQ的不同层级监控数据的方式有很多种,包括使用JConsole或者在ActiveMQ的WebConsole中使用JMX。通过处理JMX报警消息,通过使用命令行脚本,甚至通过监控各种类型的日志。部署环境:ActiveMQ可以运行在Java语言支持的平台上。使用ActiveMQ需要:JavaJDKActiveMQ安装包有以下优点:跨平台(Java编写与平台无关,ActiveMQ几乎可以运行在任何JVM上)。可以使用JDBC:可以将数据持久化到数据库中。虽然使用JDBC会降低ActiveMQ的性能,但是数据库一直是开发者最熟悉的存储介质。支持JMS规范:支持JMS规范提供的统一接口。支持自动重连和错误重试机制。具有安全机制:支持基于Shiro、JAAS等多种安全配置机制,可以对Queue/Topic进行认证和授权。完善的监控:拥有完善的监控,包括WebConsole、JMX、Shell命令行、Jolokia的RESTfulAPI。界面友好:提供的WebConsole可以满足大部分情况,还有很多第三方组件可以使用,比如Hawtio。缺点如下:社区活跃度没有RabbitMQ高。根据其他用户反馈,会出现莫名其妙的问题,消息会丢失。目前重点放在ActiveMQ6.0产品Apollo上,5.x的维护较少。不适合千队列的应用场景。RabbitMQRabbitMQ发布于2007年,是一个基于AMQP(高级消息队列协议)的可重用企业消息系统,是目前最主流的消息中间件之一。主要特点如下:可靠性:提供多种技术,让您在性能和可靠性之间做出权衡。这些技术包括持久性机制、交付确认、发布者证明和高可用性机制。灵活的路由:消息在到达队列之前通过交换路由。RabbitMQ为典型的路由逻辑提供了几种内置的开关类型。如果你有更复杂的路由需求,你可以组合这些开关,你甚至可以实现你自己的开关类型,并将其作为RabbitMQ插件使用。消息集群:同一局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理。队列高可用:队列可以镜像到集群中的机器上,即使在硬件问题下也能保证消息安全。支持多种协议:支持多种消息队列协议。多语言支持:用Erlang编写,它支持你能想到的每一种编程语言。管理界面:RabbitMQ有一个易于使用的用户界面,允许用户监控和管理消息代理的许多方面。跟踪机制:如果消息异常,RabbitMQ提供了消息跟踪机制,用户可以及时发现发生了什么。插件机制:提供多种插件,可以进行多方面的扩展,也可以自己编写插件。部署环境:RabbitMQ可以运行在Erlang语言支持的平台上,包括Solaris、BSD、Linux、MacOSX、TRU64、Windows等。使用RabbitMQ需要:ErLang语言包RabbitMQ安装包的优点如下:由于Erlang语言的特点,消息队列具有更好的性能,支持高并发。健壮、稳定、易用、跨平台、支持多国语言、文档齐全。具有消息确认机制和持久化机制,可靠性高。高度可定制的路由。管理界面比较丰富,在互联网公司也有大规模应用,社区活跃度高。缺点如下:虽然结合Erlang语言本身的并发优势,性能较好,但不利于二次开发和维护。实施了代理架构,这意味着消息可以在发送给客户端之前在中央节点上排队。此功能使RabbitMQ易于使用和部署,但由于封装后中心节点的延迟增加和更大的消息而使其速度变慢。需要学习比较复杂的接口和协议,学习和维护成本比较高。RocketMQRocketMQ是阿里的开源产品,用Java语言实现,设计时参考了Kafka,并做了一些自身的改进,消息可靠性优于Kafka。RocketMQ广泛应用于阿里内部订单、交易、充值、流计算、消息推送、日志流处理、Binglog分发等场景。主要特点如下:基于队列的模型:具有高性能、高可靠、高实时、分布式等特点。Producer和Consumer队列都支持分布式。Producer依次向一些队列发送消息,队列集合称为Topic。如果Consumer做广播消费,一个Consumer实例消费这个Topic对应的所有队列。如果使用集群消费,则多个Consumer实例平均消费该Topic对应的队列集。可以保证严格的消息顺序。提供丰富的消息拉取方式。高效的用户水平扩展能力。实时消息订阅机制。亿级消息积累能力。更少的外部依赖。部署环境:RocketMQ可以运行在Java语言支持的平台上。使用RocketMQ需要:JavaJDK安装Git、MavenRocketMQ安装包的优点如下:单机支持10000多个持久化队列。所有的RocketMQ消息都是持久化的,先写入系统PAGECACHE,然后flush掉,保证内存和磁盘都有一份数据,访问的时候,直接从内存中读取。模型简单,接口易用(JMS的接口在很多场合不是很实用)。性能非常好,允许大量消息堆积在Broker中。支持多种消费模式,包括集群消费、广播消费等,各链路分布式扩展设计,支持主从,高可用。发展程度比较活跃,版本更新很快。缺点如下:支持的客户端语言不多,目前有Java和C++,其中C++不成熟。RocketMQ社区的关注度和成熟度都不如前两者。没有Web管理界面,但是提供了CLI(命令行界面)管理工具来查询、管理和诊断各种问题。JMS等接口未在MQ核心中实现。KafkaApacheKafka是一个分布式消息发布和订阅系统。它最初由LinkedIn基于独特的设计实现为分布式日志提交系统(distributedcommitlog),后来成为Apache项目的一部分。Kafka高效、可扩展且持久。它的分区特性、可复制性和容错性都是很好的特性。主要特点如下:快速持久化:消息持久化可以在O(1)的系统开销下进行。高吞吐量:在普通服务器上可以达到10W/S的吞吐率。完全分布式系统:Broker、Producer、Consumer都原生支持分布式,自动实现负载均衡。支持同步和异步复制两种高可用性机制。支持数据批量发送和拉取。零拷贝技术(zero-copy):减少IO操作步骤,提高系统吞吐量。数据迁移和扩容对用户透明。无需停机即可扩展机器。其他特点:丰富的消息拉取模型、高效的订阅者水平扩展、实时消息订阅、亿级消息积累能力、周期删除机制。部署环境,使用Kafka需要:JavaJDKKafka安装包具有以下优势:丰富的客户端语言:支持Java、.Net、PHP、Ruby、Python、Go等语言。高性能:单机TPS约为100万条/秒,消息大小为10字节。提供全分布式架构,具有Replica机制,高可用可靠,理论上支持消息无限堆积。支持批量操作。消费者使用Pull方法获取消息。消息是有序的,通过控制可以保证所有的消息都被消费,并且只被消费一次。有一个优秀的第三方Kafkaweb管理界面Kafka-Manager。在日志领域比较成熟,被很多公司和多个开源项目使用。缺点如下:当单台Kafka机器超过64个队列/分区时,Load会出现明显的尖峰。队列越多,负载越高,发送消息的响应时间越长。对于短轮询,实时性能取决于轮询间隔。消费失败不支持重试。支持消息顺序,但是当代理宕机时,消息将乱序。社区更新缓慢。几种消息队列的比较这里比较一下上面四种消息队列的区别:Kafka是基于分布式架构,RabbitMQ是基于AMQP协议实现的,而RocketMQ的思想来自于Kafka,改成了master-从结构。已经优化。广义上讲,电子商务、金融等对事务一致性要求高的可以考虑RabbitMQ、RocketMQ,对性能要求高的可以考虑Kafka。总结本文介绍了消息队列的特点、消息队列的投递服务模型、消息的传输方式、消息的推拉模式。然后介绍了ActiveMQ、RabbitMQ、RocketMQ和Kafka等几种常见的消息队列,阐述了各种消息队列的主要特点和优缺点。通过本文,相信您会对消息队列及相关技术选型有更深入的了解和认识。更多的细节和原理,需要在实践中见分晓!作者:陈林简介:五年研发和架构经验。曾任SAP中国研发中心后端研发、上海冰鉴科技信息技术有限公司架构师助理,现任成都ThoughtWorks有限公司高级顾问、研究员。熟悉大数据、高并发、负载均衡、缓存、数据库、消息中间件、搜索引擎、容器和自动化。个人学习能力强,对技术热情高,热爱开源和撰写技术博客,善于交流和分享。
