当前位置: 首页 > 科技观察

消息中间件应用常见问题及解决方法

时间:2023-03-22 17:14:48 科技观察

1.简介消息队列(MQ)中间件已经流行了很多年。在互联网应用中,通常是稍微大一点的应用,我们都能看到MQ的身影。市场上有很多中间件中间件,包括但不限于RabbitMQ、RocketMQ、ActiveMQ、Kafka(流处理中间件)等。许多开发人员已经熟练掌握了一种或多种消息中间件的使用。不过还是有一些小伙伴对消息中间件不是特别熟悉。由于种种原因,无法深入研究和理解其中的原理和细节,导致使用时出现各种问题。在这里,我们分析了消息队列中间件使用中的典型问题(包括顺序消息、可靠性保证、消息幂等性、延迟消息等),并给出了一些解决方案。2.消息中间件的应用背景2.1消息中间件的基本思想在单个系统中,一些业务流程是可以顺序进行的。当涉及到跨系统(有时是系统内部)时,会需要更复杂的数据交互(也可以理解为消息传递)。这些数据的交互传输可以是同步的也可以是异步的。在异步数据传输的情况下,往往需要一个载体来临时存储和分发消息。在此基础上,专门为消息的接收、存储、转发而设计开发的专业应用,可以理解为消息队列中间件。延伸一下:如果我们简单的用一个数据库表来记录数据,然后接受数据存入数据表,通过定时任务把数据表中的数据分发出去,那么我们就实现了最简单的消息系统(这是本地消息表)。我们可以认为消息中间件的基本思想是利用高效可靠的消息传递机制进行异步数据传输。在这一基本思想的指导下,不同的消息由于侧重于不同的场景,具有不同的功能、表现和整体设计理念。消息队列(MQ)本身实现了一种从生产者到消费者的单向通信模型。常用的RabbitMQ、RocketMQ、Kafka等MQ都是指实现该模型的消息中间件。目前最常用的消息中间件主要有RabbitMQ、RocketMQ、Kafka(分布式流处理平台)、Pulsar(分布式消息流平台)。这里我包含了两个流处理平台,前面的其他一些消息中间件也逐渐淡出了人们的视线。在选择业务模型时,我们遵循两个主要原则:最大熟悉度原则(易运维、可靠使用)和业务契合度原则(中间件性能能够支撑业务量,满足业务功能需求)。这些常用的消息中间件的选择和比较很容易找到,这里就不详细介绍了。粗略的说:Pulsar目前使用的还不如RabbitMQ、RocketMQ、Kafka。RabbitMQ主打高可靠消息,RocketMQ主打性能和功能,Kafka主打大数据处理(Pulsar类似)。2.2引入消息中间件的意义下面举例简单介绍下异步、解耦、调峰的意义和价值(参考下面流程图):对于一个用户注册接口,假设有两个业务点,即注册,它处理新人福利发放逻辑需要50ms。如果我们把这两个业务流程耦合到一个接口上,总共需要100ms来完成处理。不过在这个过程中,用户在注册的时候,并不需要关心自己的福利是否马上发放,只要注册成功,尽快返回数据,后续的新人福利业务就可以了在主进程之外处理。如果我们分离出来,在接口主进程只处理登录逻辑,通过MQ推送一条消息,后续发新人福利的逻辑异步处理,这样在50ms左右可以得到结果注册界面。发放新人福利的业务通过异步任务慢慢处理。通过拆分业务点,我们实现了解耦,在注册的子业务中增加或减少功能点不会影响到主流程。另外,如果业务主进程在某一点请求并发比较高,可以通过异步的方式将压力分散到更长的时间段,达到降低固定时间段处理压力的目的。这就是流量削峰。另外,单线程模型的语言通常对消息中间件的需求更强烈。虽然多线程模型的语言,或者说协程类型的语言,可以通过自身的多线程(或者协程)机制实现业务内部的异步处理,但是考虑到持久化问题和管理难度,成熟的中间件更好.适用于异步数据通信,中间件也可以实现分布式系统之间的异步数据通信。2.3消息中间件的应用场景消息中间件的应用场景主要包括:异步通信:可用于业务系统内部的异步通信,也可用于分布式系统中的信息交互。系统解耦:将不同性质的业务进行隔离和切分以提升性能,将主进程和附属进程分层,按重要性隔离,减少异常影响。流量削峰:间歇性支流分散处理,降低系统压力,提高系统可用性。分布式事务一致性:RocketMQ提供的事务消息功能可以处理分布式事务一致性(比如电商订单场景)。当然也可以使用分布式事务中间件。消息按顺序发送和接收:这是最基本的功能,先进先出,需要一个消息队列。延迟消息:延迟触发的业务场景,比如下单后延迟取消未付款订单等。大数据处理:日志处理,kafka。分布式缓存同步:消费MySQLbinlog日志进行缓存同步,或者业务变更直接推送到MQ消费。所以,如果你的业务有上面列举的场景,或者类似的功能和性能需求,那么赶快引入“消息中间件”来提升你的业务性能吧。3、消息中间件的引入带来的一系列问题虽然消息中间件的引入有这么多的好处,但是在使用的过程中还是存在着很多的问题。例如:消息中间件的引入增加了系统的复杂度,如何使用和维护;消息发送失败怎么办(消息丢失);为了保证消息能够发送成功,如果消息重复发送(messageduplication)怎么办;中间件中的消息流发生异常如何处理;消费消息时,如果消费过程失败,如何处理,能否从中间件重新获取消息;如果消费失败,如果还能获取到,是否会在失败的情况下重复消费同一条消息?,使进程卡住;如果消费失败,无法再获取,那么如何保证这条消息能够被再次处理;同一个消息流程重复消费后如何处理,会不会导致业务异常;那么如何保证消费过程只成功执行一次;对于那些有顺序的消息,我们应该如何保证发送和消费的顺序是一致的;消息过多,如何保证消费脚本的消费速度,更好的满足业务的处理需求,避免消息无限积压;我想如果要发送的消息等待几秒才被消费怎么办;当然,我们可以针对业务开发者提炼上述问题,得到以下关键问题:消息顺序的保证避免消息丢失、消息重复问题、消息积压处理、延迟消息处理4.问题解决4.1消息顺序保证常规消息中间件和流处理中间件在自己的设计中一般都可以支持顺序消息,但是根据中间件本身的不同设计目标和不同的原理架构,导致我们在业务中使用中间件时的处理方式不同。以下常见消息或流中间件的顺序消息设计及使用中的乱序问题分析:RabbitMQ:RabbitMQ的单队列(queue)本身可以保证消息的先进先出。在设计上,RabbitMQData提供的单个队列存储在单个代理节点上。启用镜像队列后,镜像队列仅作为消息的副本存在,服务仍由主队列提供。在这种情况下,单个队列上的消费自然是顺序的。但是由于单个队列支持多个消费者同时消费,当我们让多个消费者消费统一队列上的数据时,消息会分发给多个消费者。并发度高时,多个消费者无法保证消息的处理。顺序。解决方法是对需要强制排序的消息使用同一个MQ队列,单个队列只开启一个消费者消费(保证并发处理的顺序,多线程也一样)。针对由此导致的单个队列吞吐量下降的问题,可以采用Kafka的设计思想,为单个任务开启一组多个队列,将需要排序的消息按照其固定标识进行路由(例如:ID)并分散到这个组队列中,相同ID的消息进入同一个队列,单个队列使用单个消费者消费,这样可以保证消息的顺序和吞吐量。如图:Kafka:Kafka是一个流处理中间件。在它的设计中,没有队列的概念。消息的发送和接收依赖于Topic。单个topic可以有多个分区(partitions),这些分区可以分布到多个broker上。在节点上,分区也可以设置副本备份,保证其高可用。Kafka可以有多个消费者甚至同一个主题的消费者组。Kafka中的消息消费一般使用消费者组(消费者组可以消费同一个topic下的消息,互不干扰)进行消费,一个消费者组中可以有多个消费者。当同一个消费组在单个topic下消费多个partition时,Kafka会调整消费组中consumer和partitions的消费进度和平衡。但有一点可以保证:即单个partition只能被同一个consumergroup中的一个consumer消费。在上述设计理念下,Kafka内部保证了同一个partition下消息的顺序,但不保证topic下消息的顺序。Kafka的消息生产者在发送消息时,可以选择将消息发送到哪个分区。我们只需要将需要顺序处理的消息发送到主题下的同一个分区即可,保证消息消费的顺序。(多线程语言使用的是单消费者,多线程处理数据时需要自己保证处理顺序,这里略过)。RocketMQ:RocketMQ的一些基本概念和原理可以通过阿里云官网了解:消息队列RocketMQ版本是多少?-RocketMQ的消息队列-阿里云[1]。RocketMQ的消息收发也是基于Topic的。Topic下有多个Queue,分布在一个或多个Broker上,保证消息的高性能收发(有点类似Kafka的Topic-Partition机制,但内部实现原理不同,相同)。RocketMQ支持部分顺序消息消费,即保证消息在同一个消息队列上的顺序消费。不支持消息的全局顺序消费。如果想实现某个主题的消息全局顺序消费,可以将主题的队列数设置为1,牺牲高可用。具体图解可参考阿里云文档:SequentialMessage2.0-消息队列RocketMQ版-阿里云[2]4.2避免消息丢失消息丢失需要分为三个部分:从消息中间件向消息中间件发送消息的过程消息生产者不会产生消息Lost,消息在消息中间件中从接收到存储到被消费的过程中不会丢失,并且保证中间件发送的消息在消息消费过程中被消费,不会丢失。生产者发送的消息不会丢失:消息中间件一般都有消息发送确认机制(ACK)。对于客户端来说,只要消息发送需要ACK确认,就可以根据返回结果判断消息是否成功发送给中间人。在文件中。这一步通常与中间件消息接受存储过程的设计有关。根据中间件的设计,我们通常采取以下措施:开启MQ的ACK(或confirm)机制,直接知道消息发送结果,开启消息队列的持久化机制(位移,如果需要特殊设置),中间件本身做得很好。高可用部署消息发送失败补偿设计(重试等)在具体的业务设计中,如果消息发送失败,我们可以根据业务的重要程度进行相应的补偿,例如:消息失败重试机制(sendfailed,继续重试如果还是失败,根据消息的重要性选择降级方案:直接丢弃或降级到其他中间件或载体(同时需要相应的降级补偿推送或消费设计)消息中间件消息不会丢失:几种消息中间件的消息接收和存储机制不同,但会根据其特点进行设计,最大限度保证消息不会丢失:RabbitMQ消息接收和保存:RabbitMQ消息发送可以开启发件人的确认模式,所有消息都会发送成功。通知发件人;你需要开启equeue消息持久化,保证消息落盘;RabbitMQ通过镜像队列保证消息队列的高可用,但是只有Master为镜像队列提供服务,其他slave只提供备份服务;master在宕机时会从slave中选出一个成为新的master提供服务;master生产消费的最新状态会广播给slave;RocketMQ消息接收和存储:RocketMQ发送普通消息有三种方式:同步(Sync)发送、异步(Async)发送和单向(Oneway))发送,区别和准确性保证可以参考《发送普通消息(三种方法)——消息队列RocketMQ版-阿里云》[3]RocketMQ内部设计的具体HA机制是一种主从同步机制,消息发送到Topic下载连接到具体消息队列的MasterBroker后,该消息将被同步到从站。只有MasterBroker可以接收生产者发送的消息。消费者可以从Master或Slave拉取和消费消息。Kafka从消息接收到存储的设计包括:分区副本方式的设计保证了消息的高可用,分区副本的数量可以在创建主题时设置;生产者可以选择接收不同类型的确认(ACK),例如消息完全提交(写入所有同步副本)时的确认,或者消息写入领导副本时的确认,或者消息发送到网络时的确认;Kafka的消息在写入分区的时候只是保存在几个分区副本的文件系统的内存中,并没有直接刷到磁盘,所以当出现宕机的时候,单个副本还是有可能丢失数据的。Kafka无法保证单个分区副本的数据不会丢失,而是依赖分区副本机制来保证消息的完整性(分发到不同的broker)。Backlog消息存储时效性问题Kafka对于topic下的数据有容量上限和时间上限两种消息存储上限规则。如果触发了这些规则中的任何一条,则将删除淘汰前的消息。这一点尤其重要。在RocketMQ中,消息在服务端的存储时间也是有上限的,达到上限的消息会被删除。也需要做出相应的考虑。受持久化磁盘容量影响,积压数据不能超过磁盘上限。如果业务消费出现异常,需要提供足够的冗余,避免消费不及时导致数据丢失。消费者消费消息不丢失:消费消息时,也必须开启相应的ACK机制,消息消费成功,即ACK(对于Kafka来说,是更新消费的offset);对于RocketMQ,有消息重消费设计,需要设置最大消费次数,失败尝试重复消费。消息ACK带来两个问题:如果消息消费失败,如果无法确认ACK,消息消费可能会无限阻塞在某条消息;消息失败重消费会导致消息重复消费。对于无限阻塞的问题,可以参考RocketMQ消费失败的重试机制,对消息重试做一定的设计:在消息体上设计重试次数属性,增加消费失败消息的重试次数,并将它们重新发送到中间件。等待下一次消费,本次消费成功回传消息,直接ACK。消息重试次数达到上限后,如果仍然失败,则启用降级方案,将消息存储在DB等异常信息的持久化载体中。手动或计划任务补偿失败的消息。消息重复消费的问题请参考下一节。4.3消息重复(消费幂等性)在分析常见的中间件时,我们经常会发现中间件设计者将这个问题的处理委托给了中间件的使用者,也就是业务开发者。诚然,业务消费处理的逻辑要比消息生产者复杂得多。生产者只需要保证消息成功发送给中间件,而消费者则需要在消费脚本中处理各种复杂的业务逻辑。解决消息重复消费的问题,核心是用一个唯一标识来标记某条消息是否被处理过。有很多具体方案可供选择,例如:使用数据库自??增主键或唯一键保证数据不会重复变化;通过中间状态和状态变化的顺序来判断业务是否处理完毕;使用一个日志表来记录已经成功处理的消息的ID。如果新到达的消息ID已经在日志表中,则不处理该消息;或者消息是唯一标识的,在Redis等NoSQL中维护一个处理缓存,判断是否处理过;如果消费者业务流程比较长,开发者需要保证整个业务消费逻辑中数据处理的事务性。4.4消息积压处理通常我们在引入消息中间件的时候,就已经对消息消费的生产率和消费率进行了评估和测试,试图达到一个平衡。但业务中也存在一些不可预知的突发事件,可能会造成消息大量积压。这时候,我们可以采取如下的处理方式:临时应急扩容,可以通过增加消费脚本来提高消费率。如果下游没有限制,可以快速减少消息积压。如果消费者下游数据处理能力有限,可以考虑建立一个临时队列,通过临时脚本快速将消息中转至临时队列,优先保证线上业务的畅通,然后开启更多的消费者脚本来处理数据积压。(顺序消息需要额外处理,保证最终处理顺序。)优化消费者脚本处理速度,突破下游限制。如果可能,考虑批处理、下游扩展等方式。防止消息积压做好业务设计和降级,避免无效消息占用资源。根据消息积压的程度,动态增减消费者数量,减少消息积压。制定消息积压处理的应急预案。异常情况按预案设计快速处理。4.5延迟消息处理延迟消息功能在一些MQ中间件中实现。延时消息和定时消息其实是可以相互转换的。RocketMQ:RocketMQ定时消息不支持任意时间精度(出于性能原因)。仅支持特定级别的延迟消息。消息延迟级别通过messageDelayLevel在broker端配置。它在内部为每个延迟级别创建一个对应的消息消费队列,然后创建一个对应延迟级别的定时任务,从消息消费队列中拉取消息并恢复消息的原始主题和原始消息消费队列。RabbitMQ:RabbitMQ实现延迟消息通常有两种方案:一种是创建消息延迟死信队列,使用死信转发队列实现消费延迟。但是,这样一来,如果前一个消息没有到达TTL时间,即使后一个消息到达,也不会被转发到转发队列;另一种是使用延迟交换插件(rabbitmq_delayed_message_exchange),直到达到TTL才转发消息。相应的队列和消费。Kafka本身不支持延迟消息或定时消息。要实现延迟消息,需要使用其他解决方案。借助数据库和定时任务实现延时消息:常用数据库的索引结构支持数据的顺序索引。借助数据库可以轻松实现随时延迟消费消息。用一张表存储数据的消费时间,启动定时任务,满足条件后提取消息,然后转发到时序队列处理或直接处理(处理的需要标记,并且它后面不会出现),但是直接处理需要考虑吞吐量和并发重复性等问题。不如将单个脚本转发到普通队列处理方便。数据库支持的定时任务消息积压是可控的,但是吞吐量会受到限制。借助Reids有序列表实现延迟消息:Reids有序列表zset结构可以实现延迟消息。以消息的消费时间为分值,将消息加入zset。使用zrangebyscore命令消费消息#命令格式zrangebyscrekeyminmaxwithscoreslimit01消费最早的消息#minmax分别代表开始分数和结束分数范围,分别使用0和当前时间戳,可以查出消费情况reached时间消息#withscores表示查询数据应该有分数。limit之后是查询的起始偏移量和数量zrangebyscorekey0{currenttimestamp}withscoreslimit01.当然这个方案也有局限性。首先,redis必须配置为持久化,防止消息丢失(如果配置不合理,不能保证100%,但是每条命令的持久化都会造成性能下降,需要权衡);其次,如果消息延迟过多,会导致消息积压形成大key;再次,需要自己平衡重复消费和消费失败(当然可以,建议开启单次消费流程,将延迟的消息转移到普通队列消费)。基于时间轮的任务调度:在很多软件中,都有基于时间轮的定时任务实现。时间轮和多级时间轮可以用来实现延迟任务调度。如果我们想自己实现一个延迟任务队列,可以考虑使用这个算法来实现任务调度,但是需要设计支持任务的延迟上限和调度的时间粒度(多级)具体需求。时间轮算法这里就不做解释了,有兴趣的可以自行搜索。5.小结通过以上几节的介绍,相信大家已经能自然而然地理解消息队列和异步解耦的作用和核心思想,对如何使用MQ来架构自己的业务也有了一定的了解。MQ使用中的大部分问题,只是需要我们多思考,仔细考虑细节,保证业务的高可用。甚至,我们可以从这些解决方案中提炼出一些核心,让我们可以参考业务中类似的思路,优化我们的业务。例如,消息顺序保证的核心是顺序消息生产者发送到唯一分区,然后维护固定分区的单个消费者的顺序消费;避免消息丢失的核心是每一步的确认和降级机制;消费幂等的核心是唯一标识和步骤状态;消息积压处理的核心是快速响应应急预案;延迟消息的核心是消息排序,优化点是性能提升。科学方法包括归纳法和演绎法。在学习解题方案的过程中,相应的核心思想在使用中进行提炼和推演。然后将这些总结的知识点应用到业务中,就可以更加得心应手地处理相应的问题。事务,构建高可用的业务架构,这是我们最需要做的。