当前位置: 首页 > 后端技术 > Java

RocketMQ消费者(一)概念与消费流程

时间:2023-04-01 19:43:00 Java

1.后台RocketMQ消费可以说是RocketMQ业务逻辑中最复杂的部分。涉及许多消费模式和特征。本想把一篇文章写完,但是写完之后发现消费涉及的内容太多,所以决定分多篇来写。本文作为消费系列的第一篇,主要介绍了RocketMQ消费涉及的模式和特点,同时也简单说了下消费过程。RocketMQ的消费过程我大致分为4个步骤。重新平衡消费者以拉取消息。Broker收到pullrequest后,会从storage中查询消息,返回给consumer消费消息。每个步骤将在一篇文章中进行解释。首先了解一下RocketMQ消费涉及的概念2.概念简述2.1消费组和消费模式的概念和大多数消息队列一样,RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。在了解它们之前,需要先介绍一下消费者群体的概念。2.1.1消费者组消费者实例是负责消费消息的消费者进程。单个消费者的速度是有限的。在实际使用中,通常会使用多个消费者一起消费同一个topic,以加快消费速度。这多个消费同一个Topic的消费者组成了一个消费者组。消费者组是一个逻辑概念,包括多个相同类型的消费者实例。通常,这些消费者都消费相同类型的消息(都消费同一个Topic),消费逻辑一致。消费组的引入是为了在消费消息时更好的负载均衡和容错。2.1.2广播消费模式(BROADCASTING)广播消费模式是指所有的消息都会广播分发给所有的消费者实例,每个消费者实例都会收到全量的消息(即使消费者组中有多个消费者订阅相同的主题)。如下图,生产者发送了5条消息,每个消费者组中的消费者都收到了全部5条消息。广播方式使用较少,适用于需要通知每个消费者的场景,比如在应用中刷新缓存。注意:广播消费模式不支持顺序消息。广播消费模式不支持消费点重置。每条消息都需要经过多台机器处理,订阅逻辑相同。消费进度在客户端维护,重复消费概率略高于集群模式。如果消费进度文件丢失,则存在消息丢失的可能。在广播模式下,RocketMQ版本的消息队列保证每条消息至少被每个客户端消费一次,但不会重新提交消费失败的消息,所以业务方需要注意消费失败的情况。在广播模式下,客户端每次重启都会从最新的消息中消费。客户端停止时,客户端向服务器发送的消息会自动跳过,请慎重选择。在广播模式下,每条消息都会被大量客户端重复处理,所以建议尽量使用集群模式。广播模式下,服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积告警、订阅关系查询功能。2.1.3集群消费模式(CLUSTERING)在集群消费模式下,同一个Topic下的一条消息只会被同一个消费组中的一个消费者消费。换句话说,消息被负载均衡到同一消费者组的多个消费者实例。具体来说,同一个消费者组中的不同消费者,会根据负载机制,平均订阅Topic中的各个Queue。(默认AVG加载方式)RocketMQ默认使用集群消费方式,这也是大部分场景都会用到的消费方式。2.2Consumer拉取消息模式2.2.1Pull是指消费者主动拉取消息进行消费,主动向Broker拉取消息,主动权由消费者应用控制。2.2.2推送是指Broker主动向消费者推送消息,Broker收到消息后会主动向消费者推送消息。这种模式的实时消费比较高,也是主流场景常用的一种消费形式。消费者组中的消费者实例会按照预设的负载均衡算法平均订阅Topic中的Queue,每个Queue最多只能被一个消费者订阅。在RocketMQ中,Push消费实际上是通过Pull消费(pull)来实现的。推送消费只是在客户端API层面进行封装,让用户感觉Broker在向消费者推送消息。2.2.3POPRocketMQ5.0引入的新消费形式是Pull的另一种实现。也可以使用POP以Push方式拉取消息,甚至可以和Push方式一起使用(分别消费retryTopic和normalTopic)。POP和Pull可以通过一个开关实时切换。在POP模式下,Broker控制每个消费者消费的队列和拉取的消息,将重平衡逻辑从客户端转移到服务器。主要解决了原有Push方式消费的以下痛点:富客户端:客户端逻辑比较重,多语言支持不友好队列垄断:一个Topic中的一个Queue最多只能被一个Push消费者消费,数量消费者的数量不能无限扩大。而当消费者挂掉的时候,队列中的消息就会堆积起来。消费后更新offset:本地消费成功才提交offset。RocketMQ5.0的轻量级gRPC客户端是基于POP消费模式开发的。对于所有的消息,都存在一个Topic中的队列如何分配给消费者的问题。2.3.1队列加载机制RocketMQBroker中的队列加载机制将一个Topic的不同队列按照算法尽可能均匀的分配给消费者组中的所有消费者。RocketMQ预设了多种加载算法,用于不同场景下的消费。AVG:队列按照个数平均分配给多个消费者,第一个Broker的所有队列按照Broker的先后顺序分配给第一个消费者,再分配给第二个消费者。AVG_BY_CIRCLE:将Broker上的queue轮流分配给不同的consumer,比较适合Topic在不同Broker之间分布不均的情况。默认采用AVG加载方式。2.3.2再平衡(Rebalance)将队列消费分配给消费者的负载过程并不是一劳永逸的。比如当consumer数量发生变化,Broker下线等情况下,原来的负载就变得不平衡了。当需要负载均衡时,这个过程称为再均衡机制。每隔20s,RocketMQ会进行一次检查,检查队列数和消费者数是否发生变化。如果有变化,就会触发消费队列的重平衡,重新执行上面的负载算法。2.4消费者的高可靠性2.4.1重试-死信机制在实际使用中,消息消费可能会失败。RocketMQ有重试机制和死信机制来保证消息消费的可靠性。正常消费:如果消费成功,提交消费站点重试机制:如果正常消费失败,消息会被消费者发回Broker,放入重试的Topic:%RETRY%消费者组。重试消费高达16次,重试的时间间隔逐渐变长。(消费组会自动订阅重试主题)。这里的延迟重试使用的是RocketMQ的延迟消息,16次重试间隔是为延迟消息配置的每个延迟级别的时间(从第三级开始)。如果修改延迟级别时间的配置,重试间隔也会相应改变。但是即使延迟级别时间间隔配置小于16,还是会重试16次,然后按照最大时间间隔重试。死信机制:如果正常消费,16次重试失败,消息会保存在死信Topic%DLQ%消费组中,此时需要人工干预。2.4.2队列加载机制和重平衡当Broker挂掉或者某个消费者挂掉时,会触发重平衡,可以自动感知某个组件挂掉,重新调整消费者的订阅关系。2.5并发消费和顺序消费消费者客户端消费时,订阅消息有两种方式,即并发消费和顺序消费。广播模式不支持顺序消费,只有集群模式才能使用顺序消费。需要注意的是,这里所说的顺序消费是指队列维度的顺序,即消费队列时,消费消息的顺序与发送消息的顺序一致。如果一个Topic有多个queue,那么在Topic层面是不可能实现顺序消费的,因为无法控制先消费哪个queue的消息。当Topic只有一个队列时,可以实现Topic级别的顺序消费。具体序列生产消费代码见官方文档。顺序生产的方式就是串行生产,生产时指定一个队列。并发消费的方法是调用消费者指定的MessageListenerConcurrently作为消费的回调类,顺序消费使用MessageListenerOrderly类进行回调。处理这两种消费方式的消费服务也不同,分别是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService。顺序消费的大体原理是依赖两套锁,一套在Broker端(Broker锁),锁住队列和消费者的关系,保证同一时间只有一个消费者在消费;消费端也有一套锁(消费队列锁)来保证消费的顺序。2.6保存并提交消费进度消费者消费完一批消息后,需要保存消费进度。如果是集群消费模式,还需要让其他消费者知道消费进度,所以需要提交消费进度。这样当消费者重启或者队列重平衡时,可以根据消费进度继续消费。消费进度在不同的模式下以不同的方式保存:广播模式:保存在消费者本地。因为每个消费者都需要消费全量的消息。在LocalfileOffsetStore中。集群模式:保存在Broker中,在消费者端缓存。因为一条topic消息只需要被消费组中的一个消费者消费,所以需要统一保存消息的消费进度。通过RemoteBrokerOffsetStore存储。在集群模式下,consumer有定时任务定期将内存中的消费进度提交给Broker,Broker也有定时任务将内存中的消费offset持久化到磁盘。另外,消费者在从Broker拉取消息时,也会提交消费offset。注意,消费者线程池提交的偏移量是线程池消费的那批消息中偏移量最小的消息的偏移量。消费一批消息后,消息消费进度保存在本地内存中。consumer中有一个定时线程,每5s向BrokerBroker提交一次内存中所有队列的消费offset。收到消费进度后,先缓存到内存中。每隔5s有一个定时任务Persist消息偏移量到磁盘。consumer在从broker拉取消息时,也会将队列的消息偏移量提交给Broker3。清晰的表示客户端Push方式的并发消费过程。从左上角第一个框开始,消费者在启动时唤醒再平衡服务RebalanceService。rebalance服务是客户端开始消费的起点。重平衡服务会周期性地(每20s)执行重平衡方法doRebalance),查询所有注册的Brokers,并根据注册的Brokers的数量给队列rebalanceByTopic()分配负载。队列创建一个消息拉取请求pullRequest,里面存放了一个处理队列processQueue,也就是图中的红黑树(TreeMap),用来存放拉取的消息。红黑树存储消息存储的顺序。消息拉取线程采用生产-消费模式,使用线程从拉取请求队列pullRequestQueue中弹出拉取请求,执行拉取任务,将拉取的消息放入处理队列。一次拉取消息完成后,拉取请求将被重用,并再次放入拉取请求队列pullRequestQueue。拉取完成后,会在NettyClientPublicExecutorThreadPool线程池中异步处理结果,并将拉取的消息放入处理队列,然后调用consumeMessageService.submitConsumeRequest,将处理队列和多个消费任务提交给消费线程池.每个消费任务消费1批次消息(1批次默认为1条消息)。每个消费者都有一个消费线程池consumeMessageThreadPool,默认有20个消费线程。消费线程池中的每个消费线程都会尝试从消费任务队列中获取消费请求,并执行消费业务逻辑listener.consumeMessage。消费完成后,如果消费成功,则更新offsetupdateOffset(先更新内存offsetTable,定时上报给Broker,Broker端也是先放到内存中,定时刷新磁盘)。参考官方文档—设计RocketMQ实战进阶—丁微RocketMQ消费动态—白云鹏消息中间件—RocketMQ消息消费(一)—狂人RocketMQ消息接收流程—赵坤RocketMQ消息消费—贝贝猫RocketMQ5.0POP消费模式探索RocketMQ消息消费源码分析Rocketmq消费消息原理——服务端技术栈RocketMQ——4.消费者消费——孔