本文转载自微信公众号《SH的全栈笔记》,作者SH。转载本文请联系SH全栈笔记公众号。这篇文章讲的是消息队列相关的东西。内容仅限于我们为什么使用消息队列,消息队列解决什么问题,以及消息队列的选择。为了更容易理解消息队列,我们??先切入一个开发场景。不使用消息队列的场景首先,我们假设A同学负责订单系统的开发,B同学和C同学负责积分系统和仓储系统的开发。我们知道,在一般的购物电商平台上,我们下单后,积分系统会为下单的用户进行积分,然后入库系统会根据信息将用户购买的商品寄出下单时填写。那么问题来了,积分系统和仓储系统如何感知用户的下单操作呢?你可能会说,订单系统当然是创建订单后调用积分系统和入库系统的接口。OK,直接调用接口的方式是目前为止没有问题。于是B同学和C同学找到A同学,让他在下单完成后调用自己的接口通知积分系统和仓储系统,从而为用户加分并发货。A同学心想,这两个系统应该没问题吧,好的,我给你加了。但是随着系统的迭代,需要感知订单操作的系统越来越多。从之前的积分系统和存储系统两个系统,增加到五个。每个系统负责的同学需要去找A同学,让其他人手动添加相应系统的通知接口。然后因为增加了这个接口,需要重新下单。这对于A同学来说其实是一件很痛苦的事情,因为A同学有自己的任务和日程,一旦有了新的系统,需要增加通知接口和发布服务,这样会打乱A的开发计划,增加A的开发量。发展。同时,还需要梳理新添加的代码在开发期间能否上线。一旦上线失败,但是又不检查,上线的时候直接炸了。而且,如果这五个系统有一个需要增加字段,或者接口更新了,还需要麻烦A同学修改。这样五个系统就和A系统强耦合了。另外,由于同步调用这五个系统的通知接口,拉长了整个创建订单的调用链,减慢了接口的响应速度,降低了用户侧的购物和下单体验。前者至少影响到内部员工,现在直接影响到用户,显然是不可取的方案。可以看到,整个调用环节都被加长了,更何况,在同步调用中,如果系统其余部分出现错误,或者调用其他系统时出现网络抖动,核心订单流程就会被阻塞,甚至会在下单界面提示用户出错,整个购物体验又降了一个档次。更何况在实际业务中,调用链远比这个长。可能有人会说,这不就是一个同步调用的问题吗?对于订单系统的核心逻辑我还是采用同步处理,但是对于后续的通知我采用异步的方式,使用线程池来处理,这样调用链路不容易恢复正常吗?简单的减少链接确实是可行的。但是,如果某个过程失败了怎么办?是失败了吗?下单成功不就加分了吗?该发给我的货竟然没有发货?这合理吗?如何处理?不然因为别的系统把整个主进程搞垮了,谁来找你买东西?那么有什么办法可以减少调用环节,出错时重试呢?归根结底,核心思想就是加分、返券等流程不要和主流程耦合,更不能影响主流程。试想一下,我们能否在订单系统完成核心逻辑后,将订单创建消息放入一个队列中,然后订单系统将下单成功的结果返回给用户。然后其他系统从这个队列中收到下单成功的消息,并进行自己的操作,比如加分,返券等等。以后如果有新的系统需要感知订单创建的消息,订阅这个队列,消费里面的消息就可以了。虽然这和真正的消息队列有些不同,但是思路是完全一致的。为什么需要消息队列通过上面的例子,我们可以大致理解为什么需要引入消息队列,这里简单总结一下。实时性不是很强的异步服务,比如给用户发送短信和邮件通知,调用第三方接口等,都可以放在消息队列中。因为相对于核心的下单流程,短信和邮件的发送时间较晚,对用户影响不大。同时也可以提高整个链路的响应时间。调峰假设我们有服务A,这是一个无状态服务。通过水平扩展,可以轻松抵抗1w并发,但是这N个服务实例底层都访问同一个数据库。数据库能承受的并发量是有限的。如果你的机器足够好,可能能承受5000并发。如果服务A的所有请求都发到数据库,数据会直接挂掉。解耦像上面的例子一样,订单系统在创建订单后需要通知所有其他系统,这使得订单系统与其余系统强耦合。后续的可维护性和可扩展性大大降低。通过消息队列将所有系统关联起来,可以达到解耦的目的。像上面的模型,后面如果有新的系统需要感知订单创建的消息,只需要消费“订单系统”发送给MQ的消息即可。同样,订单系统如果需要感知其他系统的一些事件,也只能从MQ消费。通过MQ实现了服务之间的松耦合和服务内部的高内聚,提高了服务的自治性。消息队列选择已知的消息队列包括Kafka、RocketMQ、RabbitMQ和ActiveMQ。不过由于ActiveMQ目前使用的公司比较少,这里就不展开讨论了。我们将重点关注前三个。KafkaKafka最初来自LinkedIn,是一个日志收集工具,用Java和Scala开发。其实ActiveMQ在那个时候就已经存在了,只是那个时候ActiveMQ不能满足LinkedIn的需求,于是Kafka应运而生。2010年底,Kakfa0.7.0在Github上开源。2011年,因为Kafka受到了很多关注,被收录到ApacheIncubator中。所有外部项目想要成为Apache的官方项目,都必须经过Incubator,翻译过来就是孵化器。旨在将一些项目孵化为完全成熟的Apache开源项目。您也可以将其视为一所学校。所有想成为Apache官方开源项目的外部项目,都必须进入孵化器学习,拿到文凭,才能进入社会。于是在2012年,Kafka顺利从ApacheIncubator毕业,正式成为Apache的一员。Kafka吞吐量非常高,单机可以承受十几瓦的并发,写入性能也非常高,达到毫秒级。但有优点也有缺点。能够做到这么高的并发,代价就是消息可能会丢失。至于具体的丢失场景,我们以后再说。所以Kafka一般用于大数据的日志收集,这种日志丢失一两条无伤大雅。而且Kafka的功能比较简单,就是简单的接收来自生产者的消息,消费者消费来自Kafka的消息。RabbitMQRabbitMQ是很多公司ActiveMQ的替代品,现在仍然有很多公司在使用。它的好处是可以保证消息不会丢失。像Kafka一样,天平向数据的可靠性倾斜,这必然导致其吞吐量下降。它的吞吐量只能达到几万,确实低于Kafka的十万吞吐量。如果遇到需要支持特别高并发的情况,RabbitMQ可能做不到。但是RabbitMQ拥有比Kafka更高级的特性,比如消息重试和死信队列,写入的延迟可以降低到细微的程度,这也是RabbitMQ的一大特性。但是RabbitMQ也有一个致命的弱点。它的开发语言是Erlang。国内精通Erlang的人并不多,社区也不是很活跃。这也导致了公司内没有人能读懂Erlang的源码,更谈不上根据其源码进行二次开发或排查问题的可能。所以如果RabbitMQ出了问题,公司里谁也负担不起,而且维护成本非常高。一些中小公司之所以还在用,是觉得不会面对高并发场景,RabbitMQ的功能完全够用。RocketMQRocketMQ来自阿里。和Kakfa一样,也是来自ApacheIncubator的顶级项目。它是用Java语言开发的。单机吞吐量和Kafka一样,也是10w量级。RocketMQ的前身是阿里的MetaQ项目,2012年在淘宝被广泛使用,阿里内部迭代到3.0版本后,抽取了MetaQ的核心功能,创建了RocketMQ。RocketMQ综合了Kafka和RabbitMQ的优点,例如高吞吐量,以及通过参数配置保证消息永不丢失的能力。其底层设计参考了Kafka,具有低延迟、高性能、高可用等特点。不同于Kafka单一的日志采集功能,RocketMQ广泛应用于订单、交易、计算、消息推送、binlog分发等场景。之所以能够适用于多种场景,得益于RocketMQ提供的丰富功能。例如延迟消息、事务性消息、消息回溯、死信队列等。延迟消息是不会立即被消费的消息,比如在事件开始前15分钟提醒用户。事务消息主要解决数据库事务和MQ消息之间的数据一致性。例如,用户下单时,先向MQ发送消息,并添加积分,但发送消息后订单系统挂了。这样一来,用户并没有下单成功,反而积分增加了,显然不符合预期的消息回溯。顾名思义,就是消费某个话题下某个时间段的历史消息。RocketMQ的重试机制重试。当达到最大重试次数后,如果仍然消费失败,RocketMQ不会立即丢弃消息,而是将消息放入死信队列。放入死信队列的消息会在3天后过期,所以需要及时处理消息队列。消息会丢失吗?消息是会丢失的,我们也可以配置参数让消息永不丢失。什么情况下消息丢失?消息队列中的角色可以分为三类,即生产者、MQ和消费者。一条消息在整个传输链路中需要经过以下过程。生产者将消息发送给MQ,MQ收到消息后将消息存入磁盘,消费者消费时将消息返回给消费者。先给出结论。在这三种情况下,消息都可能丢失。下面我们一步一步来分析。生产者向MQ发送消息。在发送消息的过程中,由于网络抖动等一些意外情况导致网络通信失败,消息没有发送到MQ。MQ存储消息当MQ收到生产者的消息,还没来得及处理,MQ突然关闭,此时消息就会丢失。即使MQ开始处理消息并将消息写入磁盘,消息仍然可能丢失。因为现代操作系统都会有自己的OSCache,因为和磁盘交互是一个代价高昂的事情,所以我们写文件的时候,数据会先写入OSCache,然后由OS根据策略进行调度触发真正的I/O操作以将数据刷新到磁盘。在刷入磁盘之前,如果MQ崩溃,OSCache中的所有数据都将丢失。Consumer消费消息当消息顺利经过producer和MQ,consumer拉取消息,还没来得及处理,consumer突然崩溃,消息丢失(当然如果不提交Offset,你重启后仍然可以消费这条消息)我们认为使用消息队列就可以万无一失了。没想到一步步分析下来,坑这么多。任何一步的错误都可能导致消息丢失。那么,上述的参数配置是怎样实现消息不丢失的呢?这里先不说具体的MQ是怎么实现的,先说说实现消息零丢失的思路吧消息最终一致性方案涉及的系统包括订单系统、MQ和积分系统。订单系统是生产者,积分系统是消费者。首先,订单系统向MQ发送订单创建消息。消息的状态是准备状态。Prepare状态的消息不会被消费者消费,所以可以放心发送。然后订单系统开始执行自己的核心逻辑。你可能会说,订单系统本身逻辑执行失败怎么办?刚才的preparemessage不就变成脏数据了吗?实际上,订单系统的事务失败后,会触发回滚操作,会向MQ发送消息,删除状态为Prepare的数据。订单系统核心事务成功后,会向MQ发送消息,并更新消息状态为preparetocommit。没错,这就是2PC,一种在分布式事务中保证数据一致性的协议。如果你眼尖的话,你可能已经发现问题了。发送prepare消息后,还没来得及执行本地交易,订单系统就挂了怎么办?此时订单系统即使重启也不会向MQ发送删除操作。这个prepare消息不是一直存在于MQ中吗?先给个结论,没有。如果订单系统在向MQ发送prepare消息后崩溃了,MQ确实会有一条数据不会被提交。为了解决这个问题,MQ会定时轮询所有的prepare消息,并与相应的系统通信,这条prepare消息是应该重试还是回滚。所以prepare消息不会一直存在于MQ中。这样就保证了生产者DB事务的消息和MQ中消息的数据一致性。让我们来看一个更极端的情况。假设订单系统在本地事务执行成功后向MQ发送commit消息。这时MQ突然挂了。结果MQ没有收到commit消息,而且消息在MQ中还处于prepare状态,怎么办?同样,依靠MQ的轮询机制与订单系统进行通信,订单系统会告诉MQ交易已经完成,MQ会将这条消息置为commit,消费者就可以消费这条消息了。接下来的过程就是消息被消费者消费了。消费者消费消息时,如果本地事务失败,会重试,再次尝试消费消息。
