大家好,我是三友,我又来了~~最近还在泡RocketMQ的源码。推送消费方式的实现实在是太巧妙了,趁着脑子里还有点印象,赶紧写一篇文章来分解一下,免得过两天就忘记了。MQ消费模式消费模式是指消费者如何从MQ获取消息,分为两种方式,push(推模式)和pull(拉模式)。1.push(push方法)push,顾名思义,就是推送的意思。即当MQ收到生产者产生的消息后,会主动将消息推送给消费者进行消费。这种模式称为推送,即MQ将消息推送给消费者。推送模式这种模式的优点是响应速度快,消息的实时性比较高。消息MQ收到消息后,可以立即将消息推送给消费者,消费者可以立即收到消息进行消费。但是这种推送方式有一个缺点,一旦消息量比较大,对消费者的性能要求就比较高,因为消费者无法控制MQ消息的推送速度。一旦消息量很大,消费者就会有消费消息的压力。它比较大。2.Pull(pull方法)push是MQ主动向消费者推送消息,那么pull呢?与push正好相反,就是消费者主动从MQ中拉取消息。在pull模式下,pull的优缺点自然是push的对立面。由于消费者主动从MQ拉取消息,消费者可以根据自己的消费情况决定何时拉取消息。主动权掌握在自己手中,这样对消费者的压力会比较小;但是缺点也很明显,实时性会比push方式低,因为你要决定pull的时间间隔。其实想想,消费的方式和拿快递是一样的。快递就是一个消息,我自己就是消费者。RocketMQ对消费方式的实现在上一节提到消费消息有两种方式,push和pull,或者说是一个概念。上大周洋老师有一句话我很认同,那就是“天上飞的概念,地上要实现”。那么push或者pull具体如何落地,就要看具体的MQ产品了。RocketMQ作为阿里开源的一款高性能、功能丰富的MQ,理所当然的同时实现了push和pull两种消费方式。用户可以选择在项目中使用推送还是拉取。push方式的实现就是pull方式的实现,但是一般情况下,项目都是使用push的方式来消费,因为pull的方式除了pull的时效性差之外,还得让开发者主动维护进度消息消费并添加额外的操作。.那么我们重点关注一下RocketMQ是如何实现push的逻辑的。RocketMQ智能实现push的原因上面已经说了。推送模式的优点是实时性好,缺点是对消费者的压力会比较大。那么,push模式的实现是否只能放弃对压力的控制?就在这时,RocketMQ大叫了一声。是的,RocketMQ对于推送模式做到了实时和压力的平衡。这主要是因为RocketMQ的push模式其实是一种“伪push”模式,真正的底层实现还是基于pull。说到这里,有些朋友可能会一头雾水。为什么push变成了“假push”,还是用pull来实现?是推还是拉?前面说过,push和pull只是一种理论,具体实现还是要看MQ。因此,为了兼顾两者,RocketMQ选择通过消费者主动拉取消息来达到推送的效果。这就是我称之为“伪推送”的原因。给你留言。由于底层是pull,RokcetMQ在实现consumer逻辑时可以轻松达到控压的效果。毕竟,这是“拉”法的天然buff;但是如何实现pushthroughpull毛呢的实时优势呢?毕竟RokcetMQ想鱼和熊掌兼得。这时候,就不得不提一个叫做“longpolling”的机制。轮询和长轮询轮询和长轮询都属于拉取的实现,客户端主动向服务端发送请求拉取数据。应用到MQ上,就是消费者主动去MQ拉消息。轮询轮询是指无论服务端数据有没有更新,客户端每隔一定时间请求拉取数据,可能有更新的数据返回,也可能没有返回。再拿快递举例,轮询就好,小明买的iphone13promax到了,显示正在发货,但是小明等不及了,就去快递站取了,但是快递还没有放到快递站,可是小明何忍不住相思之痛,心急如焚,于是小明每隔5分钟就跑到快递站询问快递是否到了,并在它到达时将其取回。这就是轮询的意思,就是不管有没有数据,客户端都会每隔一定时间向服务器请求一次。我们拿快递举例分析一下这个问题:每隔5分钟就跑到快递站,小明不累吗?还有一个问题。假设你刚跑到快递站,快递没有到,你又回去了,但是刚到家,快递就到了,你又等了5分钟,然后又跑到快递站,终于拿到了快递员。几分钟过去了,你还是没有第一时间拿到快递,造成延误。对应程序,会出现以下问题。对于消息,它们将始终生成。这就需要消费者定时拉取消息。即使没有消息,他们也需要请求,这会造成很多无用的消息。请求白白浪费,大量消耗服务器内存和宽带资源。可能造成数据延迟的长轮询在说长轮询的概念之前,我们先救小明,毕竟小明不想当狗。既然小明每5分钟跑一次,我们能不能换个思路,让小明在快递还没到的时候不回来,就在快递站等着,等快递到了就让小明带着快递走家。现在小明乐坏了,不仅可以有时间浏览某首音乐,逛某处购物,还能第一时间获得13promax。所以这种可以在快递站等待的机制就叫做长轮询。长轮询也是客户端对服务器的请求。如果服务端有数据,则立即返回,客户端再次请求;当服务器没有数据时,服务器不会响应客户端,而是将请求发送给Hold,当服务器有数据时,它会响应客户端并返回数据。所以长轮询可以解决下面的问题解决轮询导致的频繁请求服务器但是没有问题。一旦有新数据到来,消费者就可以立即获取到新数据,所以在效果上,有点像push的感觉。但是长轮询也会带来服务端代码实现逻辑复杂的问题。当然,相比于优势,这就不是很重要了。push消费方式的源码探索理论讲完了,该上代码了。下面看看RocketMQ是如何通过长轮询机制在压力和实时之间取得平衡的。这里我画了一个推送模式下consumer消费的流程图。消费者拉取消息的逻辑①消费者有一个后台线程,会处理拉取的消息(PullRequest)②首先判断是否有太多消息没有被消费。如果有,则每隔一定时间再次从①开始执行拉取消息的逻辑③消费者没有太多消息,没有消费,则直接向MQ发送拉取消息的请求,有消息则返回,没有消息就hold住请求,等待新消息到达返回④消费者拿到消息后,会去自定义消息处理逻辑的实现(MessageListener实现)去消费消息,并在同时再次拉取消息,继续执行①中的逻辑1.消费者拉取消息控制压力源码当消费者准备拉取消息时,会先判断消费者当前的消费压力,再决定是否拉取拉新闻。RocketMQ提供了两种判断消费压力的逻辑,一种是根据未消费消息的数量,另一种是根据未消费消息占用的内存大小。控制压力源代码判断未决消息的数量。如果数量过多,会等待重新执行拉取消息的逻辑,判断待处理消息的大小。如果pendingmessages占用的内存太大,就会等待重新执行。执行拉取消息逻辑的一般语句是当消费者消费压力过大时,不会拉取消息,而是等待一定时间再执行拉取消息逻辑。如果压力还很大,就继续等待,以此类推,直到消费者的消费压力小于阈值,才会真正向MQ发送请求拉取消息。2、MQ会要求持有源码。当服务器没有找到消息时,它会暂停请求。当存储请求持有源码无法拉取消息时,会调用PullRequestHoldService的suspendPullRequest方法存储请求。PullRequestHoldService是一个用于存储拉取请求的类。PullRequestHoldServicesuspendPullRequest方法会将请求分类,放入ManyPullRequest中,然后使用一个ConcurrentHashMap来存储。3、MQ接收消息并响应消费者源码。NotifyMessageArrivingListener当生产者发送的消息到达MQ时,MQ会回调NotifyMessageArrivingListener的到达方法。之后会调用PullRequestHoldService的notifyMessageArriving方法,MQ重新处理拉取消息的逻辑。这时候可以查到最新消息,并通过网络将最新消息返回给消费者。notifyMessageArriving和returnmessage的逻辑是finally。所以,从上面的分析可以看出,RocketMQ基于长轮询机制实现了推送消费的方式,同时兼顾了实时性和压力,其实是非常nice的。最后我想说,不管是pull还是push,还是polling和longpolling,其实都是一个理论或者一个想法,不仅仅是MQ的东西,比如在Nacos里面,也有Push和longpolling的机制。但是这些理论在不同产品中的具体实现可能不尽相同,但是都是大同小异的,所以当你理解了这些思想之后,再去看看其他框架的源码,其实是很容易的。
