当前位置: 首页 > 科技观察

面试题:一个消费者订阅了两个topic,如果一个topic消息过多,会不会影响另一个topic的消费?

时间:2023-03-19 11:38:08 科技观察

无意中在网上看到这样一个问题,一个消费者订阅了两个主题,一个主题消息过多,会不会影响另一个主题的消费?对于RocketMQ来说,看源码是多么的方便,于是就开始寻找对应的源码,然后思来想去。先给大家一个结论,再看看堵塞的原因。如果是producer瞬间产生大量消息的原因,比如spikes,消息的堆积基本不会影响;如果消费者失败,消费速度会变得极其缓慢。那会影响,但不会阻塞,只会影响速率。接下来带大家看一下源码。/***RebalanceService*consumer负载均衡线程服务*/publicclassRebalanceServiceextendsServiceThread{}我们先关注这个负载均衡线程服务,这个大家都看到了,它每20秒执行一次,这主要是负载均衡的逻辑在doRebalance方法中。我们进去看看这个方法。进入后可以看到consumerTable这个对象是循环的,里面存放了所有的消费者,然后循环调用doRebalance,继续往里找,继续往里冲。这里创建线程的时候可以看到核心处理就是这个rebalanceByTopic,传入的参数就是我们消费者收听的topic。这里的mqSet是topic的所有consumerqueue,也就是默认创建的4个queue。当然,这个数字是可以改变的。然后我们可以看到allocateMessageQueueStrategy,这是一个分配策略对象,调用里面的allocate来分配topic的消息队列。有几种方法可以实现这种分配策略。让我们来看看。看名字就可以猜出来了。感兴趣的可以点进去查看详细的处理机制。分配后,将队列分配给allocateResultSet对象。这里为什么要用set集合存放呢?个人猜测是为了防止队列数重新变化,这里可能会导致重复,这里加了一层set来防止这种极端情况。接下来队列分配完成后,主要处理是updateProcessQueueTableInRebalance,负责更新消息队列。其实也可以认为是分配了消费者需要负责的队列,也就是这是你的责任。这个消费者需要处理这些队列。我们进入updateProcessQueueTableInRebalance方法后,上面的我就折叠起来,不给大家看。这里的处理主要是针对部分机器突然宕机或者增加部分机器。该方法主要处理最后一次pullrequest,即dispatchPullRequest。传入的参数是一个pullRequests列表。创建线程后,循环处理pullRequest。哎,底层还没找到,继续点进去。创建线程哎。终于找到你了,是你,最后执行了一个put方法,放入的是一个LinkedBlockingQueue队列。这是一个拉取消息的请求队列,请求的对象是pullRequest。在实际处理的时候,也就是拉取消息的时候,多个线程会从LinkedBlockingQueue中取出消息,然后按照放置的先后顺序进行消费。让我解释一下这个过程。这里是rocketmq首先会对消息进行负载均衡Rebalance的过程。这个就是根据consumer来分配topic中的consumerqueue队列。分配策略如上所示。将pullRequest放入这个LinkedBlockingQueue中,topic、brokerName、queueID等都放在这里。这时候,后面消费的顺序已经安排好了。比如10个请求中,大概有5个是topicTest1,另外5个是topicTest2。所以,这个时候,如果topicTest1的消息堆积起来,还是会照常消费topicTest2。这时候就需要看一下堆积的原因了。如果堆积是因为秒杀等场景导致瞬间产生大量消息,消费者还是会消费的。TopicTest1是正常消费的,所以topicTest2不会受到影响。但是,如果topicTest1的消费速度很慢,所有线程处理的很慢,都被占用了,会稍微影响topicTest2的速度,但那只是暂时的,不会阻塞topicTest2,