当前位置: 首页 > 科技观察

我问了本书的创始人《RocketMQ技术内幕》一个问题

时间:2023-03-20 14:47:55 科技观察

是这样的,我在学习rocketmq的时候遇到了一个奇怪的问题,就是同一个消费组的消费者订阅了同一个主题topic,看到一个消息当我看到不同的标签时丢失问题。这个问题也是《RocketMQ技术内幕》这本书的作者,丁伟哥,然后他给我解释了我对这个问题的困惑。让我先给你解释一下。描述具体内容。同一个消费者组的两个消费者订阅了同一个主题,但标签不同。Consumer1订阅Topictag1,Consumer2订阅Topictag2,然后分别启动。此时向topic的tag1发送10条数据,向topic的tag2发送10条数据。根据目测,Consumer1和Consumer2应该分别收到对应的10条消息。结果是只有Consumer2收到了消息,而且只收到了4-6条消息,不是固定的。MQ底层数据结构的精妙之处RocketMQ专门根据主题为每个主题创建索引,让消费者根据主题进行消费,其具体实现是一个消息队列。在RocketMQ中,ConsumeQueue的引入并不是为了提升消息写入的性能,而是为了服务于消费。消息消费队列中的每个条目都是固定长度的,设计起来非常棘手。每个条目使用固定长度(8字节提交日志物理偏移量、4字节消息长度和8字节标签哈希码)。这里没有存储标签的原始字符串,而是存储了哈希码。目的是保证每个表项的长度是固定的,通过访问一个类似的数组下标可以快速定位表项,大大提高了ConsumeQueue文件的读取性能。这样,根据消费进度访问消息的方法就是使用逻辑偏移量logicOffset*20找到入口的起始偏移量(consumequeue文件中的偏移量),然后读取偏移量后的20个字节得到无需遍历consumequeue文件的条目。关于RocketMQ中的三个文件,为了帮助RocketMQ完成如此高效的壮举,我也写了一篇文章介绍这三个文件。大家可以看看这三个文件,彻底了解rocketmq的存储原理。消息过滤实现该机制的消费者端队列存储标签的哈希码。众所周知,不同的字符串得到的hashcode值可能相同,所以在服务端无法准确过滤消息,所以在RocketMQ中,消息过滤会进行两次。当客户端从服务端拉取消息时,服务端会先根据hashcode对消息进行过滤再返回消息,然后客户端收到服务端的消息后,根据消息的标签字符串进行精准过滤信息。上面的原理很容易理解,那么为什么会丢失消息呢?这其实和消息队列加载机制有关。在RocketMQ中使用集群模式消费时,同一个消费组中的多个消费者共同完成对主题中队列的消费,即一个消费者只会被分配到部分队列,同时一个队列会只分配给一个消费者,所以结合上面的过滤机制,会出现明显的问题,请看示例图:问题的核心是同一个标签会分布在不同的队列中,但是消费者C1分配到了队列是q0,q1,在q0,q1中有taga和tagb消息,但是tagb消息会被消费者C1过滤掉,但这部分消息不会被C2消费,导致消息丢失。因此,在RocketMQ中,一个消费组中的所有消费者都必须保持相同的订阅关系。我们回过头来看看这个问题。首先,它是由Broker决定的,而不是由Consumer决定的。Consumer向Broker发送心跳。Broker收到后,存储到consumerTable(也就是一个Map)中。键为GroupName,值为ConsumerGroupInfo。ConsumerGroupInfo包含topic等信息,但是问题出在上一步。关键是组名。如果使用相同的GroupName,Broker心跳接收到的最后一个Consumer将覆盖前一个。它等效于以下代码:map.put(groupName,ConsumerGroupInfo);这样,必须覆盖相同的密钥。那么Consumer1不会收到任何消息,而Consumer2为什么只收到一半(不固定)的消息呢?那是因为:你是在集群模式下消费,它会把负载均衡的分配给各个节点消费,所以有一半的消息(数量不定)发送给了Consumer1。结果Consumer1订阅了tag1,所以不会有输出。如果换成BROADCASTING,后者肯定会收到所有的消息,不会收到一半,因为broadcasting是广播了所有的Consumer。/***消费者信息*/publicclassConsumerGroupInfo{//群组名称privatefinalStringgroupName;//主题信息,如主题、标签等privatefinalConcurrentMapsubscriptionTable=newConcurrentHashMap();//客户端信息,如clientId等privatefinalConcurrentMapchannelInfoTable=newConcurrentHashMap(16);//拉/推privatevolatileConsumeTypeconsumeType;//消费模式:BROADCASTING/CLUSTERINGprivatevolatileMessageModelmessageModel;//消费在哪里?privatevolatileConsumeFromWhereconsumeFromWhere;}/***通过心跳向Broker端注册Consumer信息。*/publicbooleanregisterConsumer(finalStringgroup,finalClientChannelInfoclientChannelInfo,ConsumeTypeconsumeType,MessageModelmessageModel,ConsumeFromWhereconsumeFromWhere,finalSetsubList,booleanisNotifyConsumerIdsChangedEnable){//consumerTable:维护所有的ConsumerGroupInfoconsumerGroupInfo=this.consumerTable.get(团体);//如果没有Consumer,则放到map里if(null==consumerGroupInfo){ConsumerGroupInfotmp=newConsumerGroupInfo(group,consumeType,messageModel,consumeFromWhere);//放到地图里ConsumerGroupInfoprev=this.consumerTable.putIfAbsent(group,tmp);consumerGroupInfo=prev!=null?上一个:tmp;}//更新消费者信息,客户端信息booleanr1=consumerGroupInfo.updateChannel(clientChannelInfo,consumeType,messageModel,consumeFromWhere);//更新订阅主题信息booleanr2=consumerGroupInfo.update订阅(子列表);如果(r1||r2){如果(isNotifyConsumerIdsChangedEnable){this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE,group,consumerGroupInfo.getAllChannel());}}this.consumerIdsChangeListener.handle(ConsumerGroupISTER,groupREG,subList);返回r1||r2;}从这一步可以看出,消费者信息是以groupName为key,ConsumerGroupInfo为value的map(consumerTable)中存储的。很明显后者肯定会覆盖前者,因为key相同,后者的tag是tag2,肯定会覆盖前者的tag1,保存在ConsumerGroupInfo的subscriptionTable中。privatefinalConcurrentMapsubscriptionTable=newConcurrentHashMap();SubscriptionData包含主题和其他信息publicclassSubscriptionDataimplementsComparable{//topicprivateStringtopic;私有字符串子字符串;//标签privateSettagsSet=newHashSet();privateSetcodeSet=newHashSet();}其实这个问题已经解决了,相当于后面消费者的注册信息会覆盖前面消费者的注册信息,也导致出现上述现象。先启动订阅tag1的consumer,再启动订阅tag2的consumer。此时最新的心跳信息来自tag2的consumer,导致这个consumer的订阅信息覆盖了之前的订阅。信息,这是因为在RocketMQ中认为同一个消费组的消费者的订阅信息需要保持一致,不一致是不允许的。如果有这种东西,你可以创建一个新的主题,或者创建一个新的消费者组。在使用过程中,一定要保持消费组的订阅信息一致。这也就导致了sender发送的tag1消息根本不会被这个consumer接收到,两个consumer自然不会消费这个消息。而为什么只收到tag2的部分消息呢?这是因为rocketMQ默认采用的是集群消费模式,即生产者的消息会通过负载均衡的方式平均发送到多个消费者队列。默认是4个,也就是我们启动的两个consumer会分别监听两个consumerqueue队列,也就是说tag2的消息有一半左右会投递到consumer1的机器上消费,而consumer1监听tag1,这不是satisfied消息的条件,所以无法监控消息的topic和tag信息是如何覆盖的/***其实很简单,就是以topic为key,SubscriptionData为value。SubscriptionData包含标签信息,所以直接覆盖*/publicbooleanupdateSubscription(finalSetsubList){for(SubscriptionDatasub:subList){SubscriptionDataold=this.subscriptionTable.get(sub.getTopic());if(old==null){SubscriptionDataprev=this.subscriptionTable.putIfAbsent(sub.getTopic(),sub);}elseif(sub.getSubVersion()>old.getSubVersion()){this.subscriptionTable.put(sub.getTopic(),sub);}}}本文参考文章:https://codingw.blog.csdn.net/article/details/116299837。https://dalin.blog.csdn.net/article/details/107241375。