在上一篇文章中学习了RocketMQ消息存储的相关原理。本文将讲一下消息消费的过程和相关概念。消息消费消息消费和消费群的概念和Kafka基本类似。例如,一个消费者组可以包含多个消费者,一个消费者组可以订阅多个主题。消费组之间有集群模式和广播模式两种。集群模式下,topic下的同一条消息只允许消费组中的一个消费者消费,消费进度存储在broker端。在广播模式下,每个消费者都可以消费消息,消费进度存储在消费者端。在集群模式下,一个消费队列一次只能被一个消费者消费,一个消费者可以消费多个消息队列。详见我之前的文章。而且,rocketmq消息服务器和消费者之间的消息传输有两种方式:推送模式和拉取模式。Pull模式,即消费者主动向消息服务器发送请求;推送模式,即消息服务器向消费者推送消息。推模式是在拉模式的基础上实现的。Consumer启动主要是初始化三个组件,然后启动后台定时任务。三个组件:[RebalanceImpl]平衡消息队列服务,负责分配当前Consumer可以消费的消息队列(MessageQueue)。当添加或删除新的Consumer时,将重新分配消息队列。[PullAPIWrapper]拉取消息组件[offsetStore]消费进度组件的几个定时任务PullMessageService从阻塞队列中获取消费者的拉取请求pullRequestQueueRebalanceService负载均衡定时任务,分配一个可消费的MessageQueue给ConsumerfetchNameServerAddr定时获取NameSever地址updateTopicRouteInfoFromNameServer定时更新Topic路由信息cleanOfflineBroker定时清理离线BrokersendHeartbeatToAllBrokerWithLock发送心跳persistAllConsumerOffset持久化消费进度ConsumerOffset消息拉取对于任何消息中间件,消费者客户端一般有两种方式从消息中间件获取消息并消费:Pull消费者主动去Broker拉取消息有规律的间隔。优点消费速度和数量可控缺点如果间隔短,可能为空,频繁的RPC请求增加网络开销如果间隔长,消息可能会延迟消费进度Offset需要消费者维护Push,即Broker主动实时推送消息给消费者。优点:消息实时,保持长链接,不会频繁建立链接。缺点:如果消息数量过多,消费者吞吐量较小,可能会导致消费者缓冲区溢出。文章开头我们也说过RocketMQ的push模式是基于pull模式实现的。【PullMessageService消息拉取】RocketMQ通过PullMessageService拉取消息。从代码段PullMessageService#run可以看出:publicvoidrun(){//stopped是voidate修饰的变量,用于线程间通信。while(!this.isStopped()){//..//阻塞队列,如果pullRequestQueue没有元素,则阻塞PullRequestpullRequest=this.pullRequestQueue.take();//消息拉取this.pullMessage(pullRequest);//...}}关于PullRequest//消费组privateStringconsumerGroup;//消息队列privateMessageQueuemessageQueue;//消息处理队列,从Broker拉取的消息先存入ProcessQueue,然后提交到消费者消费池消费privateProcessQueueprocessQueue;//要拉取的MessageQueue的偏移量privatelongnextOffset;//是否加锁privatebooleanlockedFirst=false;PullMessageServicePullRequest添加有两种方式:延迟添加和立即添加【关于ProcessQueue】ProcessQueue是MessageQueue在消费者端的复制和快照.PullMessageService默认每次从消息服务器拉取32条消息,按照消息的队列偏移量顺序存储在ProcessQueue中,然后PullMessageService将消息提交给消费者线程池。消息消费成功后,从ProcessQueue中移除。//读写锁privatefinalReadWriteLocklockTreeMap=newReentrantReadWriteLock();//消息存储容器,k:消息偏移量,v:消息实体privatefinalTreeMapmsgTreeMap=newTreeMap();//ProcessQueueTotalprivatefinalAtomicLongmsgCount=newAtomicLong();//ProcessQueue中消息的总大小privatefinalAtomicLongmsgSize=newAtomicLong();//当前ProcessQueue包含的最大队列偏移量privatevolatilelongqueueOffsetMax=0L;//当前ProcessQueue是否被丢弃privatevolatilebooleandropped/=false;开始消息拉取时间戳privatevolatilonglastPullTimestamp=System.currentTimeMillis();//最后一次消息消费时间戳privatevolatilonglastConsumeTimestamp=System.currentTimeMillis();【消息拉取的流控】processQueue中的消息数大于1000条,且processQueue中的消息大小大于100MB,则延迟50毫秒后拉取消息。如果processQueue中偏移量最大的消息和偏移量最小的消息之间的跨度超过2000,则延迟50毫秒后拉取消息。根据主题拉取订阅的消息,如果为空则延迟3秒,再拉取。【消息服务器broker组装消息】代码位置:PullMessageProcessor#processRequest根据订阅消息,构建消息过滤器调用MessageStore.getMessage查找消息根据主题名和队列号获取消息消费队列消息偏移量异常情况校对nextpulldeviation偏移量根据PullRequest填充responseHeader的nextBeginOffset、minOffset、maxOffset。根据主从同步延时,如果从节点数据中包含下次拉取的offset,则设置下一次拉取任务的brokerId。如果commitlog标志可用且当前节点是主节点,则更新消息消费进度【消息拉取长轮询机制】RocketMQ推送方式是循环向消息服务器发送消息拉取请求。消费者从broker拉取消息时,如果消息还没有到达消费队列,并且没有开启长轮询机制,服务器会等待shortPollingTimeMills(默认1秒),然后再判断消息是否到达消息队列.如果消息Ifitisnotreached,提示消息拉客户端PULL_NOT_FOUND。如果开启了长轮询模式,rocketMQ会每隔5s轮询一次消息是否可达。同时,一旦有新消息到来,它会立即通知挂起的线程去验证新消息是否是它感兴趣的,如果是,它会从commitlog文件中读取提取的消息返回给消息拉取客户端,否则,直到挂起超时,超时时间由消息拉取器在拉取消息时封装在请求参数中,PUSH方式默认为15s。PULL模式由DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis设置。RocketMQ通过在Broker端配置longPollingEnable为true来开启长轮询模式。RocketMQ的长轮询机制是由2个线程完成的。PullRequestHoldService、ReputMessageService。【Push消费模式流程分析】后台独立线程RebalanceServic根据Topic中的消息队列数量和当前消费组中的消费者数量进行负载均衡,将对应的MessageQueue分配给当前消费者,并进行封装作为PullRequest实例并将其放入pullRequestQueue中的队列中。Consumer端启动后台独立线程PullMessageService不断从队列pullRequestQueue中获取PullRequest,并通过网络通信模块将Pull消息的RPC请求异步发送给Broker端。这是一个典型的生产者消费者模型,实现了准实时的自动消息拉取。PullMessageService异步拉取消息后,使用PullCallback进行回调处理。如果拉取成功,则更新消费进度,将putPullRequest发送到阻塞队列pullRequestQueue,然后立即进行拉取。监听器ConsumeMessageConcurrentlyService会一直监听回调方法PullCallback,并将拉取到的消息交给ConsumeRequest处理。ConsumerRequest会调用消费者业务端实现的consumeMessage()接口来处理具体的业务。消费者业务方处理完成后,会返回一个ACK给ConsumerRequest。如果消费者ACK返回失败,则在集群模式下将消息发回Broker重试(广播模式重试成本太高),最后更新消费进度offsetTable。在Broker端,PullMessageProcessor业务处理器收到Pull消息的RPC请求后,通过MessageStore实例从commitLog中获取消息。如果第一次尝试Pull消息失败(比如Broker端没有消息消费),先通过长轮询机制hold和suspend请求,然后通过Broker端的后台线程PullRequestHoldService重试,后台线程ReputMessageService进行第二次处理。【推送消息流程图】RocketMQ消息消费的长轮询机制普通轮询和长轮询的区别:普通轮询比较简单,就是每隔一段时间发起一个请求,服务端收到请求后不管有没有立即返回数据是否已更新。优点实现简单,易于理解。缺点是服务器是被动的,服务器必须不断地处理客户端连接,服务器无法控制客户端拉取的频率和客户端的数量。长轮询是对普通轮询的一种优化,仍然是客户端发起请求,服务端收到后并不立即响应,而是保持客户端连接,等待数据变化(或者过了指定时间还没有变化)在回复客户之前。说白了,就是给普通的轮询增加一个控件。您的客户可以随时向我提出要求,但是否回复我有最终决定权。这样可以保证服务端不会被客户端节奏控制,造成不可控的压力。在RocketMq中,consumer发起pullrequest,broker在处理消息pullrequest时,如果没有找到消息,则不会返回任何信息给consumer,但是会hold住请求,先挂起请求,这样下一次pullrequest不会马上发起,请求信息pullRequest会添加到pullRequestTable中,等待触发事件通知消费者。producer发送最新消息时,首先持久化到commitLog文件,同时异步持久化consumerQueue和index。然后激活consumer发出的请求hold,立即通过channel向consumer客户端写入消息。如果没有消息到达,并且客户端拉取的偏移量是最新的,则请求将被保留。其中,hold请求超时时间<请求设置的超时时间。同时Broker端也会定时检查请求是否超时,超时后立即返回请求,状态码为NO_NEW_MESSAGE。然后在Broker端,通过后台独立线程PullRequestHoldService遍历所有pending的pullRequestTable请求,如果有消息就会返回响应给消费者。同时,另一个ReputMessageService线程不断的构建ConsumeQueue/IndexFile数据,不断检测是否有新的消息。如果有新消息,则通过Topic+queueId的key从pullRequestTable中获取holdrequest对应的pullRequest,然后根据lengthLinkchannels进行通信响应。通过这种长轮询机制,解决了Consumer端需要不断发送无效的轮询Pull请求,导致整个RocketMQ集群中Broker端负载过高的问题。流程如下:消息队列加载和重分发机制当业务系统部署多台机器时,每台机器启动一个Consumer,这些Consumer都在同一个ConsumerGroup中,即一个消费组。一个Consumer消费一个Topic,一个Topic中有多个MessageQueues。比如有2个Consumer,3个MessageQueue,这3个MessageQueue怎么分配?这就涉及到Consumers的负载均衡。首先,Consumer在启动时,会向所有的Brokers注册自己,并保持心跳,让每个Broker都知道消费者组中有哪些Consumer。那么Consumer在消费的时候,会随机连接一个Broker,获取消费组中的所有Consumer。主要流程如下:RocketMQ消息队列重分配由RebalanceService线程实现。RebalanceService从MQClientInstance开始。RebalanceService默认每隔20秒执行一次MQClientInstance#doRebalance【主题消息队列加载过程】,获取主题的队列,向broker发送请求,获取主题下消费组中所有消费者客户端的ID。仅当两者都不为空时才需要重新平衡。在rebalancing的时候需要对队列和consumerclientID进行排序,保证同一个consumergroup下的view一致。根据分配策略AllocateMessageQueueStrategy为消费者分配队列。客户端执行过程中,伴随着PullMessageService和RebalanceService的线程交互,消息消费过程【consumptionprocess】默认拉取32条消息,如果消息数大于32条,则分页处理。每次消费都会判断processQueue是否被删除,防止消费者消费不属于自己的队列,重试消息的主题名。rocketMQ消息重试机制决定,如果发现消息的延迟时间级别大于0,则首先将重试主题存储在消息的属性中,然后将主题名称设置为SCHEDULE_TOPIC,使其时间到后可以重新参与消息消费。消费前执行hock执行。我们写的消费代码,消费完之后,执行hock消费,然后验证processQueue是否被删除。如果删除,则不会处理结果。处理消费者返回的结果。如果消费成功,则ack=consumeRequest.getMsgs().size()-1,直接更新消费进度。如果消费失败,则ack=-1,重发消息。如果重发消息再次失败,则延迟5秒后继续消费。无论消费成功还是失败,都会更新消费进度[MessageConfirmation]客户端发送重试消息时,封装了ConsumerSendMsgBackRequestHeader。//消息物理偏移量privateLongoffset;//消费组privateStringgroup;//延迟级别privateIntegerdelayLevel;//消息IDprivateStringoriginMsgId;//消息主题privateStringoriginTopic;//最大重消费次数,默认16次privateIntegermaxReconsumeTimes定义在SubscriptionGroupConfig.retryMaxTimes中;service端的接收逻辑首先获取消费者组的订阅配置信息。如果不存在,则直接返回创建主题:%RETRY%+组,随机选择一个队列使用原有消息创建新消息。如果重试消息的最大重试次数超过16次(默认),则将消息放入%DLQ%队列(死信队列)。等待手动处理以通过Commitlog.putMessage保存消息。小结从消息消费者和消费组的基本概念,到消息消费的过程。我们了解了RocetMQ消息消费的原理。消费者客户端启动后,会在后台运行多个定时任务,处理相关逻辑。我也知道RocetMQ的消息获取有push和pull两种模式,push模式也是建立在pull模式基础上的。知道普通轮询和长轮询的区别,理解长轮询的实现逻辑。了解消息消费和确认流程。