当前位置: 首页 > 科技观察

没想到RocketMQ的标签还有这个“坑”!

时间:2023-03-22 01:25:14 科技观察

RocketMQ提供了基于标签的消息过滤机制,但是很多朋友在使用过程中或多或少会有疑惑。无意间找到了RocketMQ官方钉钉群。记得有不少朋友问过以下问题:今天给大家分享一下使用RocketMQTag需要注意的几个问题。小伙伴们看完后,如果觉得有帮助,期待您的点赞和支持。消费组订阅关系不一致。为什么收到的消息会丢失?如果标签有少量消息,它会显示高延迟吗?1、消费组订阅关系不一致,导致消息丢失。从消息消费的角度来看,消费组是一个基本的物理隔离单元,每个消费组都有自己的消费站点、消费线程池等。RocketMQ初学者容易犯这样的错误:消费组中不同的消费者订阅到同一个主题的不同标签,这会导致消息丢失(有些消息没有被消费)。在思考这个问题的时候,我们不妨先看一张图:简单说明一下它的核心关键点:比如一个topic一共有4个queue。消息发送方连续发送4条tagA消息后,又连续发送4条tagb消息。消息发送方默认采用轮询负载均衡机制,使得主题的每个队列中有两个标签tagA和tabB。信息。消费者组dw_tag_test中一个IP为192.168.3.10的消费者订阅了tagA,另一个IP为192.168.3.11的消费者订阅了tagB。消费组中的消费者在消费消息之前会先进行队列加载。默认为平均分布。分发结果:然后Consumers向Broker发起消息拉取请求。192.168.3.10消费者只会订阅tagA,所以q0和q1中有tagB的消息会被过滤,但是过滤后的tagB不会投递给另外一个订阅了tagB的消费者,导致这部分消息没有被投递,导致消息丢失。同样,192.168.3.11的消费者只会订阅tagB,所以q2和q3中tagA的消息会被过滤掉,但是过滤后的tagA不会投递给另外一个订阅了tagA的消费者,导致这部分消息被未送达,导致消息丢失。192.168.3.10分配给q0和q1。192.168.3.11分配给q2和q3。2、如果一个tag的消息数量很少,会不会表现出很高的延迟?文章开头,有些朋友会有这样的担心。场景大致如下图所示:消费者消费这条offset=100的tag1消息后,会连续出现1000W条非tag1消息,这个消费者组的积压会不会继续增加,直接到1000W?要理解这个问题,我们至少应该重点查看以下功能的源码:消息拉取过程位点提交机制本文将以问题为导向,通过自己的思考,找到关键源码进行验证,以及最后进行简单的示例代码验证。在遇到问题之前,我们可以试着先想一想。如果我们要实现这个功能,应该怎么考虑呢?我们需要判断在消费组消费完offset=100的消息后,接下来的1000万条消息会被过滤掉的情况下,如果我们想让站点提交,应该怎么设计呢?我觉得至少应该有以下几个关键点:拉取消息时,连续1000W条消息都找不到合适的消息,服务端怎么处理客户端拉取与不拉取时如何提交站点?,肯定是很耗时的,客户端不能等这么久,所以服务端必须采取措施,必须触发一个条件停止搜索,返回NO_MESSAGE给客户端,客户端要等多久消息搜索?核心要点1:客户端在向服务端发起消息拉取请求时,会设置一个超时时间。代码如下:与超时时间相关的两个变量含义不同:longbrokerSuspendMaxTimeMillis允许在Broker端当前没有匹配消息时Suspend时间,默认15s,目前不支持自定义。longtimeoutMillis取消息的超时时间,默认30s,暂时不支持自定义。即消息拉取的最大超时时间为30s。核心要点2:Broker端在处理消息拉取时设置完整的退出条件,具体通过DefaultMessageStore的getMessage方法,具体代码如下:核心要点:首先,客户端在发起时会传入一个当前的期望值要拉取的消息数对应于上面代码中的maxMsgNums。如果拉取了指定数量的消息(具体代码请读者参考isTheBatchFull方法),则正常退出。还有一个很关键的过滤条件,就是一次消息拉取过程中服务器扫描的最大索引字节数,也就是一次拉取ConsumeQueue扫描的字节数,取16000乘以预期的拉取项数减20,因为一个consumequeueentry占用了20个字节。server端还包含longroundrobin机制,即扫描到指定字节数,但没有找到消息,会在broker端挂起一段时间,如果有新消息到达并满足过滤条件,它会唤醒并返回消息给客户端。回到这个问题,如果服务端连续有1000万条非tag1消息,那么pullrequest不会一次性筛选,而是返回,这样客户端就不会超时。从这里可以打消第一个顾虑:找不到消息服务器不会等待也不返回,看是否会积压的关键是如何提交站点。2.2站点提交机制2.2.1客户端拉取合适的消息站点提交机制。Pull线程从服务器拉取结构后,会将消息提交给消费者组的线程池。主要定义在DefaultMQPushConsumerImpl的PullTask??类中。具体代码如下:众所周知,RocketMQ在消费成功后提交站点。代码在ConsumeMessageConcurrentlyService中,如下图:这里的核心点:消费者成功消费消息后,会采用最小站点提交机制,保证消费不丢失。最小站点提交机制其实就是把拉取的消息放到一个TreeMap中,然后消费者线程消费成功一条消息后,将消息从TreeMap中移除,然后计算站点:如果当前TreeMap中还有消息处理,返回TreeMap中的第一条消息(最小位置)。如果当前TreeMap中没有消息处理,则返回位置为this.queueOffsetMax,queueOffsetMax表示从当前消费队列位置拉取的最大消费,因为此时拉取的消息全部被消费完。最后调用updateoffset方法更新本地位置缓存(有定时持久化机制)2.2.2客户端不拉取合适的消息位置提交机制如果客户端没有拉取合适的信息,比如全部通过过滤tags,处理方法定义在DefaultMqPushConsumerImpl的PullTask??中,如下图:关键代码在correctTasOffset中,具体代码请看:核心要点:如果此时处理队列中的消息为0,则下次被拉取的偏移量作为位置,服务器查找消息时会把这个值往前推。代码在DefaultMessageStore:的getMessage中,所以从这里可以看出,即使所有的消息都被过滤掉了,位置还是会被往前推的,不会造成大量的积压。2.2.3拉取消息时,会有站点提交。其实RocketMQ的站点提交,在客户端提交站点时,会先保存在本地缓存中,然后将站点信息一次一次提交给broker。其实还有另一种隐式的站点提交机制:即在拉取消息时,如果本地缓存中有站点信息,会设置一个系统标志:FLAG_COMMIT_OFFSET,会触发服务器端的站点提交.具体代码如下:2.2.4总结与验证综上所述,使用TAG不会因为对应的标签数量比较少而导致大量积压。为了验证这个观点,我也做了一个简单的验证。具体方法是启动一个消息发送器向指定主题发送标签B消息,消费者只订阅标签A,但消费者没有消费积压。测试代码如下图所示:如下图查看消费者组的积压: