当前位置: 首页 > 科技观察

阿里二面:RocketMQ消息积压,增加消费者有用吗?

时间:2023-03-21 00:34:31 科技观察

大家好,我是君哥,今天分享一道有趣的面试题。面试官:RocketMQ消息积压,加消费者有用吗?我:这个要看具体场景,不同场景情况不一样。面试官:能详细说说吗?我:如果消费者的数量小于MessageQueue的数量,增加消费者可以加快消息的消费,减少消息的积压。比如一个Topic有4个MessageQueue,2个消费者消费。如果添加消费者,详情可以加快拉取消息的频率。如下图所示:如果消费者的数量大于等于MessageQueue的数量,增加消费者是没有用的。比如一个Topic有4个MessageQueue,有4个consumer进行消费。如下图所示:采访者:您提到的第一种情况,增加消费者的数量一定会加快消息消费的速度吗?我:这个……,一般情况下,是可以的。记者:有什么特殊情况吗?当然是我。消费者消息拉取的速度也取决于本地消息的消费速度。如果本地消息消费慢,会延迟一段时间再拉取。采访者:什么情况下消费者会等一段时间再拉?我:消费者拉取的消息存在于ProcessQueue中,消费者有流控。如果出现以下三种情况,则不会主动拉取:ProcessQueue中保存的消息数超过阈值(默认1000条,可配置);ProcessQueue中保存的消息大小超过阈值(默认100M,可配置);对于非顺序消费场景,ProcessQueue中保存的最后一条消息和第一条消息的偏移量差值超过一个阈值(默认2000,可配置)。这部分源码请参考类:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl。面试官:还有其他情况吗?我:对于顺序消费场景,如果ProcessQueue加锁失败,拉取也会延迟。延迟时间为3s。面试官:消费者获取消息的延迟。一般的原因是什么?我:其实延迟抓取的本质就是消费者消费慢了,导致下次抓取的时候ProcessQueue中的消息积压超过了阈值。以下面的架构图为例:Consumer消费缓慢,但可能是以下原因造成的:consumer处理的业务逻辑复杂,耗时长;消费者查询速度慢,或者数据库负载高导致响应慢;缓存等中间软件响应慢,如Redis响应慢;调用外部服务接口响应慢。面试官:对外接口响应慢有什么应对措施吗?我:这将根据具体情况进行讨论。如果调用外部系统只是一个通知,或者调用外部接口的结果没有处理,可以使用异步方法,在异步逻辑中使用retry方法来保证接口调用成功。如果必须处理外部接口返回的结果,可以考虑接口返回的结果是否可以缓存默认值(考虑业务可行),快速使用默认值代替返回接口返回值调用失败后的降级方法。如果该接口返回的结果必须处理,不能缓存,可以将抓取到的消息存储在本地,直接返回CONSUME_SUCCESS给Broker。等待外部系统恢复正常后再从本地取出处理。面试官:如果消费者数量小于MessageQueue数量,外部系统正常响应,有什么需要考虑的,以便快速消费积压的消息,增加消费者?我:虽然外部系统响应正常,但是添加多个消费者后,外部系统的接口调用会突然变多。如果达到吞吐量限制,外部系统将响应缓慢甚至挂起。同时还要考虑本地数据库和缓存的压力。如果数据库响应变慢,处理消息的速度也会变慢,并不能缓解消息的积压。面试官:新增一个消费者后,如何分配一个MessageQueue?我:Consumer在拉取消息之前,需要对MessageQueue进行load操作。RocketMQ使用定时器完成加载操作,默认每20s重新加载一次。面试官:能详细说说有哪些负载策略吗?我:RocketMQ提供了6种加载策略,我们依次看一下。平均负载策略:对消费者进行排序;计算每个消费者可以平均分配的MessageQueues数量;如果消费者数量大于MessageQueues数量,多出的消费者将不会被分配;如果不能均分,则用MessageQueues总数求消费者数的余数mod;为之前消费者的mod个数给每个消费者加一个,从而得到每个消费者分配的MessageQueues个数。比如4个MessageQueue和3个消费者的情况:源码的逻辑很简单,如下://AllocateMessageQueueAveragely这个类//4个MessageQueue和3个消费者,如果是第一个,index=0intindex=cidAll。indexOf(currentCID);//mod=1intmod=mqAll.size()%cidAll.size();//averageSize=2intaverageSize=mqAll.size()<=cidAll.size()?1:(mod>0&&index0&&索引<模型)?index*averageSize:index*averageSize+mod;//range=2,所以第一个消费者被分配2intrange=Math.min(averageSize,mqAll.size()-startIndex);for(inti=0;i(Arrays.asList("room1")),"room2".setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);consumer.start();这个策略broker的命名必须遵循这样的格式:机房名@brokerName,因为消费者在分配队列的时候,首先根据机房名过滤掉所有的MessageQueue,然后均匀分配策略。//AllocateMessageQueueByMachineRoom这个类ListpremqAll=newArrayList();for(MessageQueuemq:mqAll){String[]temp=mq.getBrokerName().split("@");if(temp.length==2&&consumeridcs.contains(temp[0])){premqAll.add(mq);}}//以上根据机房名称过滤掉所有MessageQueue并放入premqAll中,其次是按照就近机房平均分配策略:遵循与机房分配原则相比,就近分配的好处是可以分配到一个没有消费者的机房。如下图,3号机房的MessageQueue也分配给了消费者:如果一个机房没有消费者,那么这个机房的MessageQueue会分配给集群中的所有消费者。源码所在类:AllocateMachineRoomNearby。ConsistentHash算法策略:通过Hash计算将所有消费者分发到Hash环上,对所有MessageQueue进行Hash计算,找到最近的顺时针消费者节点进行绑定。如下图:源码如下://TheclassAllocateMessageQueueConsistentHashCollectioncidNodes=newArrayList();for(Stringcid:cidAll){cidNodes.add(newClientNode(cid));}//使用consumers构建Hash环,并在Hash环节点上分发消费者finalConsistentHashRouterrouter;//用于构建hashringif(customHashFunction!=null){router=newConsistentHashRouter(cidNodes,virtualNodeCnt,customHashFunction);}else{router=newConsistentHashRouter(cidNodes,virtualNodeCnt);}//哈希操作MessageQueue寻找环上最近的消费者Listresults=newArrayList();for(MessageQueuemq:mqAll){if(clientNode!=null&¤tCID.equals(clientNode.getKey())){results.add(mq);}}面试官:恭喜,通过了。

猜你喜欢