当前位置: 首页 > 网络应用技术

RocketMQ源代码分析9:消费者消费过程

时间:2023-03-08 00:07:51 网络应用技术

  基于RocketMQ-4.9.0的RocketMQ-4.9.0分析

  在去经纪人拉消息之前必须执行的一个非常重要的操作:触发重量平衡

  早些时候,我们分析了消费者的启动过程。其中两个是特别的关注,一个是一个沉重的平衡服务课,另一个是拉消息的服务班。

  让我们看一下拉消息的服务类别。他是一个异步线,启动后将阻止它。

  该对象是拉消息的入口

  }

  然后继续看到该方法的核心逻辑:

  在这里,它将穿越该系列。此处的集合是分配结果(默认值),然后根据每个结果构建一个对象。

  如果我有两个消费者A,B;和集群消耗,然后根据平均分布策略的排队数量为4,然后分配A的队列为2(queueid = 0,1),并且B也分配了B。);在集群模式下,队列只能在同一消费者中消费。策略,然后穿越消费者A的4个队列,为每个队列创建一个队列,然后将其插入收集。

  然后,我们继续研究分发pullrequest方法的逻辑:

  这是Pullrequest的集合,然后我们继续看不见:

  这是重点。它将对象放入队列中。这是熟悉的吗?是的,这是从队列开始获得对象的队列。如果起初不可用,它将永远是。现在,通过后,队列中有数据,现在可以获得它,然后绘制消息。

  然后,我们取决于如何通过拉消息来吸引消费者

  现在通过它后,我们可以在这里获取对象,现在很好。然后,我们继续跟进,最后到达中间。我们看到了什么?

  该代码没有发布,更多,我将直接分析更重要的部分

  流量控制主要是为了保护消费者。当消费者消费能力还不够时,拉动速度太快了,这将导致大量消息积压,这可能会溢出内存溢出

  这非常重要,但是我在这里没有谈论它。从经纪人中摘下消息后,它将被移交给它。那时,我们返回看到它,然后首先跳过这里。

  参数更重要。RocketMQ是要确保消息不会重复消息(宏)以将上述参数信息封装到对象中,然后拉消息

  拿起消息异步,然后将绘制的内容移交给Pullcallback进行处理。后来,我们回来看看

  接收到服务器的消息请求的处理器是:首先,一系列参数,权威判断,我们直接跳过它,介绍了拉消息的核心代码

  }

  接下来,我们总结了拉动逻辑的逻辑:

  因此,这次我总共阅读了32条消息(和1条消息没有读取取消,等待下一篇阅读)

  它是将响应客户端的对象转换为客户端的本地对象,然后将对象交给回调功能处理(即步骤2.2)

  然后延迟3S时钟,再次将对象放入队列中,再次等待消息的逻辑

  2.4.2.1消息转换和过滤将二进制内容转换为MessageEXT对象;并根据标签过滤

  2.4.2.2更新对象的属性值

  在前面的2.3.2步骤中,我们读取32条消息,计算后32个消息,然后将消息和偏移值返回给消费者。因此,这里的值为32。

  2.4.2.3将阅读消息保存到本地缓存队列

  2.4.2.4将消息提交到线程池以进行消费(重要)我不会首先在这里扩展,我将其详细介绍

  2.4.2.5在阻塞队列中再次抢购

  这里有一个参数,指示将间隔放入队列多长时间(实际上该间隔要去经纪人来拉消息)。当消费者的消费速度速度快于生产者时,可以考虑此值,因此可以考虑使用此值,因此为了避免拉空消息的概率。新对象放在上面的队列中(此新对象仅是因为值已更改),然后第二大步骤是执行。当经纪人收到拉的请求时,然后根据(value = 32)读取逻辑索引。当我第一次举例说明时,我总共写了33条消息(然后有33个索引数据),所以他会阅读[32,33]间隔的索引数据,这是最后一条消息索引,最后是消息索引,这是最后一条消息索引。将其退还给消费者。重复,阅读经纪人的消息消耗。

  是步骤2.4.2.4的逻辑

  由于我们是普通的消息(不是顺序新闻),因此该类是由类消耗的。线程池将在内部创建。该线程池非常重要,新闻最终将提交到此线程池。

  但是在服从线程池之前,另一件事要做---”社会信息

  也就是说,该时间大小和(默认= 1)值的消息总数。如果大小>,则将消息分配,然后将消息批量提交到线程池。

  对象已提交到线程池,他是一个,所以我们只看了该方法。

  获取消息监视器并开始支出

  这种听众和消费方法熟悉吗?没错,他是我们的消费代码中指定的指定回调监视器

  在这一点上,消费者确实开始消费者新闻。

  直接查看其核心逻辑:

  3.3.1.1广播模式消耗广播模式的消耗成功,然后执行步骤3.3.2

  广播模式消耗失败,直接丢弃消息,什么也没做

  3.3.1.2集群模式消耗群集模型消耗成功,然后执行步骤3.3.2

  如果群集模式消耗失败,它会遍历消息,并将消息再次发送回经纪人;如果消息发送回经纪人失败并且无法丢弃,请将消息放入内部线程池中,然后再次等待消费。

  让我简单地说你回来了什么?

  这里有一个问题吗?

  如果提交到线程池的消息总数为10,我以前的消费的前9个成功,但是最后一个消费失败了,前9个中的前9个是否必须尝试?

  答案是:是的。这也表明重试可能导致重复的消费。这一点是要注意。

  不管消费是否成功,排队缓存的消息都会删除,然后更新到Office Table()

  当消费者客户端启动时,将激活许多定时任务,并且持续办公室的时机任务

  遍历所有队列,然后将其偏移转移到文件中。

  总结这些步骤:

  在这一点上,消费者的消费已经结束。

  消费过程比新闻的生产要复杂得多。

  仅限于作者的水平,文本中有不可避免的事情。欢迎校正,不要喷洒,谢谢

  原始:https://juejin.cn/post/7099478711975542797