说到消息队列,心里还是有些波澜。消息队列、缓存和分库分表是高并发解决方案的三把剑,而消息队列是我最喜欢的,也是我思考最多的技术。下面分享一下我在消息队列这四个阶段的故事,也是对自己技术成长经历的回顾。初识:ActiveMQ进阶:Redis&RabbitMQ升华:MetaQ挚爱:RocketMQ1初识ActiveMQ1.1异步&解耦2011年初,我在一家互联网彩票公司做研发。我负责用户中心系统,提供用户注册、查询、修改等基本功能。用户注册成功后,需要给用户发送一条短信。因为本来就是面向过程的编程,所以我把新建用户模块和短信发送模块组合在一起。一开始还好,但是慢慢的问题就出现了。短信通道不够稳定,发送一条短信需要5秒左右,用户注册界面耗时较长,影响前端用户体验;如果短信通道接口发生变化,则需要修改用户中心代码。但用户中心是核心系统。每次上网,都需要谨慎。这种感觉很别扭,非核心功能影响核心系统。第一个问题,我可以采取线程池的方式来做,主要是异步的。但是第二个问题让我很困惑。于是向技术经理请教,他让我引入消息队列来解决这个问题。将短信发送功能拆分成一个独立的Job服务;用户中心用户注册成功后,向消息队列发送消息,Job服务收到消息后可以调用短信服务发送短信。这时候我才明白,消息队列的核心功能就是异步和解耦。1.2调度中心摇号系统业务相对复杂。在彩票订单的生命周期中,有创建、子订单拆分、出票、中奖计算等多个环节。每个环节需要不同的业务处理,每个系统都有自己独立的表,业务功能相对独立。如果每个应用程序都修改订单主表中的信息,那将是相当混乱的。该公司的建筑师设计了调度中心的服务。调度中心的职责是维护订单核心状态机、订单奖励流程、彩票核心数据的生成。调度中心通过消息队列、票务网关、奖品计算服务等系统进行信息传递和交换。在我当时幼小的眼里,这种设计就像是水滴对战人类舰队,降维打击。随着对业务理解的不断加深,我隐约感觉到:“好的架构是简洁的,应该是易于维护的”。当彩票业务日均交易量几千万的时候,调度中心只有两个研发和维护人员。调度中心源码中的业务逻辑、日志、代码规范都很优秀。在以后的编程生活中,我也会下意识地模仿调度中心的编码方式,“别耍花样,代码是给人看的”。1.3重启大法随着彩票业务的爆发式增长,日消息量从30万条增长到150-200万条。一切似乎都很稳定。某一天,双色球投注结束,调度中心无法消费消息队列中的数据。消息总线处于发送状态,没有接收状态。整个技术团队都处于极度焦急的状态,“如果不能出票,那将是几百万的损失,如果用户中了两个双色球?那就是几千万了。”每个人都急得像热锅上的蚂蚁。这也是整个技术团队第一次遇到消费积累,大家都没有经验。首先想到的是多部署几个调度中心服务。部署完成后,调度中心消费了上千条消息还是挂了。这时架构师只能采用重启策略。你没看错,就是重启大法。说起来真的很惭愧,但当时真的是只能这样了。调度中心重启后,花了1万到2万元又挂了。只能重新开始。来回20多次,像挤牙膏一样。而随着开票期限的临近,这种精神上的紧张和恐惧也变得更加强烈。最后,经过1个小时的手动重启,消息终于被消费了。当时刚好在看毕轩老师的《分布式java应用基础与实践》,心想是不是线程阻塞了,于是用Jstack命令查看堆栈状态。果然不出所料,线程阻塞在提交数据的方法上。我们第一时间与DBA沟通,发现oracle数据库执行了很多大事务,每个大事务执行需要30多分钟,导致调度中心的调度票线程阻塞。为了避免堆积问题,技术部门后来采用了以下解决方案:生产者发送消息时,将超大消息拆分成多批消息,以降低调度中心执行大事务的概率;数据源配置参数,如果事务执行超过一定时间后,会自动抛出异常并回滚。1.4回顾Spring封装的ActiveMQAPI非常简单易用,用起来真的很舒服。受限于当时彩票技术团队的技术水平和眼界,我们在使用ActiveMQ的过程中遇到了一些问题。高吞吐量下,消息量累积到一定程度很容易挂掉。技术团队发现,在吞吐量特别高的场景下,如果消息堆积越大,ActiveMQ挂掉的几率就越低。票务网关消息量很大,有些消息不需要立即消费,但是为了避免消息队列Hang的问题,票务网关在消费数据时,先将消息持久化到本地磁盘,生成本地XML文件,然后异步和周期性地执行消息。这样,我们大大提高了票务网关的消费速度,基本消除了票务网关排队的堆积。但是这个方法感觉很奇怪。消费新闻时,必须在本地存储另一份数据。新闻存储在本地。如果磁盘出现故障,也有丢失消息的风险。高可用机制有待完善。我们采用master/slave部署方式,一主一从,服务器配置为4核8G。这种部署方式可以同时运行两个ActiveMQ,并且只允许一个slave连接Master,也就是说一个集群只能有2个MQ,两个服务之间有数据备份通道.单向数据备份。该方案在实际生产线上使用不便,因为当Master挂掉后,Slave无法自动接收Client发送的邀请,需要人工干预,必须先停止Slave再重启恢复负载集群。还有一些非常奇怪的消息丢失事件。producer发送消息成功,但是master控制台查询不到,但是slave控制台确实可以查询到消息。但是消费者没有办法消费slave上的消息,只能手动处理。2Redis&RabbitMQ进阶2014年在艺龙从事红包系统和优惠券系统优化工作。2.1Redis可以做消息队列吗?酒店优惠券计算服务采用第一代流计算框架Storm。Storm这里就不详细介绍了,大家可以参考下面的逻辑图:这里我们Storm集群的水源(数据源)是redis集群,消息队列的push/pop功能是通过使用列表数据结构。流式计算的整体流程:酒店信息服务向Redis集群A/B发送酒店信息;Storm的spout组件从Redis集群A/B获取数据,获取成功后向Bolt组件发送元组消息;Bolt组件接收到消息后,通过操作配置规则清理数据;最后,Storm将处理后的数据发送到Redis集群C;存储服务从Redis集群C获取数据,存储到数据库中;搜索团队扫描数据库表并生成索引。风暴描述这个流计算服务每天处理数千万条数据,处理起来比较流畅。但团队内部对解决方案仍有不同意见:升级storm拓扑,或重启优惠券服务时,偶尔会出现消息丢失的情况。但是消息的丢失对业务没有那么敏感,我们也提供了手动刷新的功能,也在业务的承受范围内;团队需要密切关注Redis的缓存使用情况,担心Redis队列堆积导致内存不足;架构师认为搜索团队直接扫描数据库解耦不够。建议将Redis集群C换成Kafka,搜索团队直接从Kafka消费消息生成索引;我认为使用Redis作为消息队列应该满足以下条件:容忍小概率消息丢失,通过定时任务/手动触发实现最终一致的业务场景;消息堆积概率低,有相关告警监控;消费者的消费模式应该足够简单。2.2RabbitMQ是管道而不是池RabbitMQ是用erlang语言编写的。RabbitMQ满足了我的两个需求:高可用机制。艺龙内部采用的是镜像高可用模式,这种模式在艺龙内部已经使用了很长时间,其稳定性也得到了一定程度的验证。在我负责的红包系统中,RabbitMQ的日吞吐量在百万条消息左右,消息的发送和消费都相当完善。优惠券服务最初使用的是SqlServer。由于数据量大,技术团队决定采用分库分表的策略,使用公司自研的分布式数据库DDA。因为是第一次使用分布式数据库,为了测试DDA的稳定性,我们模拟发送1000万条消息到RabbitMQ,然后优惠券重构服务消费消息后,根据hash到不同的mysql库用户号码。RabbitMQ集群模式是镜像高可用,3台服务器,每台配置4核8G。我们以每小时300万条消息的速度发送消息。第一个小时生产者和消费者的表现都很好,但是由于消费者的速度跟不上生产者的速度,消息队列出现了积压。第三个小时,消息队列已经积累了超过500万条消息,生产者发送消息的速度从最初的2毫秒提升到500毫秒左右。RabbitMQ的控制台当场有血溅,红标告警。这是一个无意的测试。从测试情况来看,RabbitMQ是优秀的,但是RabbitMQ对消息堆积的支持不是很好。当大量消息积压时,RabbitMQ的性能会急剧下降。有朋友跟我说:“RabbitMQ明明是个管道,你非要把它当成一个池子?”随着互联网数据量的激增,很多业务场景允许适当的积累。只要消费者能够顺畅消费,整个生意就不会出现大的波动。我心里越来越相信:消息队列既可以作为管道,也可以作为池。3SublimationMetaQMetamorphosis的由来是我从学习linkedin的开源MQ开始-现在转入apache的kafka。这是一个设计独特的MQ系统。它使用拉机制而不是一般的MQ推模型。它有大量的我使用zookeeper来进行服务发现和偏移量存储。我很欣赏也很认同它的设计理念。我强烈建议您阅读它的设计文档。总的来说,蜕变的设计是完全符合它的。---MetaQ3.1惊艳消费模型作者庄晓丹2015年主要从事中国专车订单的研发工作。MetaQ满足了我对消息队列的幻想:“分布式、高吞吐、高积累”。MetaQ支持两种消费模型:集群消费和广播消费,因为之前使用的消费模型都是使用队列模型。刚开始接触这种发布-订阅模式的时候,我还是很惊叹的。集群消费订单创建成功后,发送消息给MetaQ。此消息可由调度服务或BI服务使用。当广播消费者调度服务将订单分配给司机时,它会向司机发送一条推送消息。推送是使用广播消费的方式实现的。大体流程是:driver端的push服务是一个TCP服务。启动后,以广播方式消费MetaQ的PushTopic;驱动端会定时向推送服务发送TCP请求。认证成功后,推送服务会保存司机号和通道调度服务向MetaQ发送推送消息;推送服务的每台机器都会收到消息,然后判断内存中是否有驱动程序的通道引用,如果有则推送消息。这是一个非常经典的广播消费案例。我曾经研究过京麦TCP网关的设计,它的推送也是类似的方式。3.2激进减峰2015年是网约车大战硝烟弥漫的一年。就神州专车而言,随着订单的不断增长,业绩压力与日俱增。早晚高峰时段,用户打车时,往往点击下单,往往没有反应。在系统层面,私家车API网关发现大范围超时,订单服务性能急剧下降。数据库层面的压力就更大了,高峰期插入一条记录需要8秒。整个技术团队需要尽快提升私家车系统的性能,之前已经按照模块字段拆分了数据库。但是系统的瓶颈还是很明显的。我们设计了现在看来有点激进的设计:设计订单缓存。如果大家对缓存的方案感兴趣,我们以后再说,有很多点可以详细讨论;在订单的生命周期中,订单的修改操作首先修改缓存,然后发送消息给MetaQ,下单服务消费消息,判断订单信息是否正常(如是否乱序),如果订单数据是正确的,它将被存储在数据库中。这里有两个细节:消费者在消费的时候,需要顺序消费,实现方式是根据订单号路由到不同的分区,每次将相同订单号的消息发送到同一个分区;[图片上传中...(image-5c6295-1668478881924-1)]一个守护任务,定时轮询当前订单,当缓存与数据不一致时,修复数据,并发出告警。本次优化大大提升了订单服务的整体性能,也为后续的订单服务分库分表、异构化打下了坚实的基础。根据我们的统计数据,缓存和数据库之间基本没有最后的不一致。但是这个方案对缓存的高可用要求比较高,有点激进。3.3MessageSDK封装做过基础架构的同学可能会有这样的体会:“三方组件都会封装在一层”,中国架构团队也将metaq-client封装在一层。在我看来,封装一层可以减少研发人员使用第三方组件的心智投入,统一技术栈,仅此而已。直到一场意外发生,我的思维升级了。那是一个下午,整个乘车服务崩溃了很长一段时间。技术团队发现:“专车使用zookeeper进行服务发现,zk集群的leader机器挂了,一直在选举leader。”经过临时解决,我们发现MetaQ和服务发现都使用了同一套zk集群,提交的是consumer的offset。而负载均衡会对zk集群进行大量的写操作。为了减少MetaQ对zk集群的影响,我们的目标是:“MetaQ使用独立的zk集群”。需要部署一个新的zk集群;MetaQ的zk数据需要同步到新集群;保证切换到新集群,应用服务基本无感知。好奇的问了建筑系的同学。他说新集群已经部署好了,但是zk数据需要同步到新集群。他在客户端添加了双写操作。也就是说:除了在原来的zk集群中写一份数据,我们还要在新的zk集群中写一份数据。几周后,MetaQ使用单独的zk集群的任务已经完成。这次经历给我带来了很多感慨:“我还能这样玩吗?”这也让我想到:三方组件的封装并没有想象的那么简单。我们可以看看快手消息的SDK封装策略:对外只提供了最基本的API,所有的访问都必须通过SDK提供的接口。简洁的API就像冰山一角。除了简单的外部接口外,以下所有内容都可以在不破坏兼容性的情况下进行升级和替换;业务开发也很简单,只要提供Topic(全局唯一)和Group即可生产和消费,无需提供环境、NameServer地址等,SDK内部会根据解析集群NameServer的地址Topic,然后连接到相应的集群。生产环境和测试环境会解析不同的地址,从而实现隔离;上图分为3层,第二层是通用的,第三层对应具体的MQ实现。因此理论上可以用其他Message中间件替代,客户端程序不需要修改;SDK集成了热改机制,可以在不重启Client的情况下动态配置,比如下发路由策略(更换集群NameServer的地址,或者连接到另一个集群Go),Client的线程数,超时时间期等。通过Maven的强制更新机制,可以保证业务使用的SDK基本是最新的。3.4重构自包含系统MetaQ我有一个习惯:“经常找运维、DBA、架构师,了解当前系统有没有问题,解决问题的思路。这样,我换个角度看待公司的制度运作”。MetaQ也有他的缺点。MetaQ的基础通信框架是gecko,MetaQ偶尔会出现rpc无响应,应用卡顿的情况,不易定位问题;MetaQ的运维能力较弱,只有简单的Dashboard界面,无法实现自动主题申请、消息追踪等功能。有一天,我发现测试环境的一台消费者服务器启动后,一直报链接异常,CPU占用率高。我立即用netstat命令查看,发现已经创建了数百个链接。出于好奇,打开源码,发现网络通信框架gecko已经被netty取代了。我们会第一时间联系建筑系的同学。那时我才意识到:他们已经开始重构MetaQ。我从来没有想过重构一个开源软件,因为它离我太远了。或者那个时候觉得自己能力不行。后来中国自研的消息队列成为了自己的系统,在生产环境中一直运行的很好。时至今日,我仍然很佩服中国建筑团队。他们自己开发了消息队列、DataLink(数据异构中间件)、分库分表中间件等。他们乐于创新,勇于做出更好的技术产品。我从他们身上学到了很多。可能是看到他们重构MetaQ的那一刻,心里就种下了一颗种子。4热爱RocketMQ4.1开源盛宴2014年收集了很多关于淘宝消息队列的资料。我知道MetaQ的版本已经升级到MetaQ3.0,但是开源版本还没有发布。大约在秋天,我加入了RocketMQ技术组。石佳(RocketMQ创始人)在群里说:“最近开源要发布了,发布了大家抓紧fork一下。”他这句话发到群里后,群里炸开了锅。心里更加高兴了,期待早日看到阿里自己的内部消息中间件。最后,RocketMQ终于开源了。我迫不及待地想看他一眼。因为想学网络编程,而RocketMQ的通讯模块remoting底层也是Netty写的。因此,RocketMQ的通信层是我学习的起点。我模仿了RocketMQ的remoting,写了一个玩具rpc,大大提升了自信心。巧合的是,艺龙举办了科技创新活动。我想了想,不如试试用Netty重写Cobar的通信模块。于是参考Cobar的源码,花了两周的时间写了一个netty版的proxy,其实很粗糙,很多功能还不完善。后来这次活动给了我一个鼓励奖,现在想想都觉得有趣。因为在UCAR中使用了MetaQ,所以我在学习RocketMQ方面也比较得心应手。为了真正理解源码,我经常参考RocketMQ的源码,写一些轮子来验证自己的学习效果。虽然自己也做过一些练习,但是从来没有在商业环境中使用过。2018年是我真正使用RocketMQ的一年,也是收获的一年。短信服务短信服务应用广泛,如用户注册登录验证码、营销短信、下单成功短信通知等。当初设计短信服务的时候,想了解一下业界是怎么做的。于是目标就锁定在了腾讯云的短信服务上。腾讯云短信服务有以下特点:统一SDK,后台入口为http/https服务,分配appId/appSecret进行鉴权;简单的API设计:单发、群发、营销单发、营销群发、模板单发、模板群发。所以,我参考了这个设计思路。模仿腾讯云SDK设计,提供简单易用的短信接口;设计短信服务API,接收短信请求,发送短信信息到消息队列;worker服务消费消息,根据负载均衡算法Interface调用不同通道提供者的SMS消息;Dashboard可以查看短信发送记录,配置渠道商信息。[图片上传中...(image-8e9d3f-1668478881924-0)]短信服务是我第一次真正意义上在生产环境中使用RocketMQ。在MQ控制台使用过RocketMQ的朋友一定对上图中的控制台不陌生。当时团队有多个RocketMQ集群,每个集群需要部署一个单独的控制台。所以我想:我能不能稍微修改一下控制台来支持多集群。所以,我卷起袖子开始工作。在开源版本的基础上修改了可以支持多组集群的版本,用了大概20天。做完之后,虽然能满足我最初的想法,但是很粗糙。而且,搜狐还开源了自己的MQCloud。看了他们的设计,感觉离一个消息管理平台还差得很远。后来又看了《网易云音乐的消息队列改造之路》和《今日头条在消息服务平台和容灾体系建设方面的实践与思考》这两篇文章,越是心痒痒,越是想搭建一个真正的消息管理平台。可惜一直没有场景和机会。最近在你好看到自行车建筑专家梁勇的一篇文章《哈啰在分布式消息治理和微服务治理中的实践》,推荐大家看看。https://mp.weixin.qq.com/s/N-...一个窗口,开始自己开发组件后,尝试进一步使用RocketMQ。以ONS风格封装消息SDK;运维端消息队列平滑扩容;生产环境试试DefaultMQPullConsumer的消费模式。在设计这些系统的时候,我从RocketMQ的源码中汲取了很多营养。虽然看起来设计上还有很多不完善的地方,代码质量也有待提高,但是在做完这些系统之后,我的自信心得到了很大的提升。RocketMQ为我打开了一扇窗,让我看到了更广阔的Java世界。对我来说,这是开源的盛宴。4.2Kafka:大数据生态中不可或缺的一部分Kafka是一个分布式消息流处理中间件,具有高吞吐、持久、水平扩展、支持流式数据处理等特点。它采用分布式消息发布和订阅机制,广泛应用于日志采集、流式数据传输、在线/离线系统分析、实时监控等领域。日志同步在大型业务系统设计中,为了快速定位问题,全链路跟踪日志,及时监控故障,通常需要对各个系统应用的日志进行统一的分析处理。集中的方式。Kafka设计的初衷是为了应对大量的日志传输场景。应用程序将日志消息以可靠和异步的方式同步到消息服务,然后使用其他组件实时或离线分析日志。它还可以用于收集关键日志信息以进行应用程序监控。日志同步主要有三个关键部分:日志采集客户端、Kafka消息队列、后端日志处理应用。日志采集客户端负责用户各种应用服务的日志数据采集,将日志以消息的形式“批量”、“异步”发送给Kafka客户端。Kafka客户端批量提交和压缩消息,对应用服务的性能影响很小。Kafka将日志存储在消息文件中以提供持久性。一个日志处理应用程序,如Logstash,在Kafka中订阅和消费日志消息,最后提供文件搜索服务来检索日志,或者Kafka将消息传递给其他大数据应用程序,如Hadoop进行系统存储和分析。日志同步示意图:流计算处理应用于很多领域,比如股市走势分析、气象数据测控、网站用户行为分析等。由于数据生成速度快、实时性强、体量大,您很难统一收集这些数据并存储到数据库中。然后再做处理,导致传统的数据处理架构不能满足需求。Kafka、Storm、Samza、Spark等流计算引擎的出现,就是为了更好的解决这类数据处理过程中遇到的问题。流计算模型可以在数据流动过程中实现对数据的实时捕获和处理。处理,并根据业务需求进行计算分析,最后将结果保存或分发到需要的组件中。作为10多年的数据中转枢纽,KV存储(HBase)、搜索(ElasticSearch)、流处理(Storm、Spark、Samza)、时序数据库(OpenTSDB)等专用系统应运而生。这些系统诞生时心中只有一个目标,因为它们的简单性使得在商品硬件上构建分布式系统变得更加容易和更具成本效益。通常,需要将相同的数据集注入多个专用系统。例如,当应用日志用于离线日志分析时,搜索单独的日志记录也是必不可少的,建立独立的工作流来收集每一类数据,然后将它们导入到自己的专用系统中显然是不切实际的。使用消息队列Kafka版本作为数据传输枢纽,可以将相同的数据导入不同的专用系统。下图是美团MySQL数据实时同步到Hive的架构图,也是一个非常经典的案例。4.3如何选择技术2018年去哪儿QMQ开源,2019年腾讯TubeMQ开源,2020年Pulsar如火如荼,消息队列生态如此繁荣,那么我们该如何选择模型呢?我认为我们不必局限于消息队列,我们??可以扩展它。只是谈谈我的看法。数据库在专业化——“一刀切”的方式不再适用-----MongoDB设计理念的第一点:先有场景,再有适应这个场景的技术。什么样的场景选择什么样的技术。第二点:现实往往很复杂。当我们真正进行技术选型并需要实施时,技术储备和成本是我们需要重点关注的两个因素。技术储备技术团队是否有使用该技术的经验,是否踩过生产环境的坑,是否有针对这些坑的完整解决方案;架构团队是否有成熟的SDK、工具链,甚至是技术产品。成本研发、测试、运维投入人工成本;服务器资源成本;招聘成本等最后一点是人的因素,尤其是管理者的因素。每一次重大的技术选型,都考验着技术管理者的眼光、布局和管理智慧。