当前位置: 首页 > 科技观察

排查生产环境MQ集群一个很奇怪的消费延迟

时间:2023-03-17 18:53:30 科技观察

1。问题现象某天,项目组同事给我反馈,他们用公司的数据同步产品同步MySQL数据到MQ集群,然后用consumer把数据同步到ES后,反馈数据同步延迟严重了,但是对应的consumergroup没有积压,只是最近几分钟的数据没有同步。问题来了。消费端没有消费积压,通过数据同步平台查看应该经过的任务的同步状态,也说明没有积压。为什么?遇到这个问题,我们应该静下心来分析一下它的大概数据流图。整理后如下图所示:经初步诊断,Binlog同步无延迟,数据同步产品MQ消费无积压。那为什么Es集群中的数据跟MySQL有长达几分钟的延迟呢?2.排查根据上图中几个关键组件的数据同步延时检测,基本排除了数据同步组件的问题和MQ消费端本身的消耗问题。问题的症结应该是数据同步组件成功向MQ集群写入数据。而MQ集群返回写入成功,但是消费者没有及时感知消息,也就是说消息虽然写入了MQ集群,但是没有到达消费队列。因为如果数据同步组件没有成功写入,MySQLBinlog日志就会延迟。但是如果是MQ消费端的问题,MQ平台也会显示消费组积压。那为什么消息服务器写入成功了,为什么消费者组却察觉不到呢?首先,为了验证上面的结论是否正确,我还去看了题目的详细信息:查看题目统计时,发现当前系统时间是19:01,但是该主题的最新写作时间仅为18:50。两者之间几乎相差10分钟。备注:以上界面为我司内部消息运营管理平台。其实底层就是调用RocketMQ提供的topicStatus命令。那么这是怎么发生的呢?这里假设大家对RocketMQ的底层实现原理不是很熟悉。对于这种情况,我觉得首先应该弄清楚topicStatus命令返回的minOffset、maxOffset和lastUpdate的具体获取逻辑。只有了解了这些,我们才能追根究底,最终找到解决办法。2.1问题探索与原理分析在该场景下,我们可以通过分析topicStatus命令来探究其背后的实现原理。当我们在命令行中输入sh./mqadmintopicStatus命令时,最终调用了defaultMQAdminExtImpl的examineTopicStats方法,最后在AdminBrokerProcessor的getTopicStatsInfo方法中定义了server端的处理逻辑。核心代码如下:这里的实现要点:通过MessageStore的getMinOffsetInQueue获取最小偏移量。通过MessageStore的getMaxOffsetInQueue获取最大偏移量。最新更新时间是最大偏移量减一的存储时间(代表最新消息)。因此,要找出队列的最大偏移量和最小偏移量,关键是理解getMaxOffsetInQueue或getMinOffsetInQueue的计算逻辑。我也注意到,虽然分析源码可以直达真相,但读起来太粗糙,所以我会在后面的文章中尽量避免通篇解读源码。相反,我只会指出源代码的入口,其侧面细节将通过时序图获取流程图,方便有兴趣的读者朋友探索。我重点提炼知识点,降低大家的学习成本。如果你想系统学习RocketMQ,将消息中间件作为你职业生涯的重头戏,我强烈推荐购买我的两本关于RocketMQ的书:《RocketMQ技术内幕》和《RocketMQ实战》。MessageStore的getMaxOffsetInQueue时序图如下:从上面的时序图我们可以知道,调用DefaultMessageStore的getMaxOffsetInQueue方法首先根据主题和队列ID获取ConsumeQueue对象(在RocketMQ中,一个主题的一个队列对应aConsumeQueue,代表一个消费者队列),即这里获取的偏移量指的是消费者队列中的偏移量,而不是Commitlog文件的偏??移量。如果是寻找最大偏移量,找到队列中的最后一个文件,得到最大有效偏移量,等于文件的起始偏移量(fileFromOffset)加上文件当前最大可读值的偏移量(readPosition),所以这个时序图中很关键的一点就是如何获取消费队列的最大可读偏移量。代码参见MappedFile的getReadPosition:publicintgetReadPosition(){returnthis.writeBuffer==null?this.wrotePosition.get():this.committedPosition.get();}由于ConsumeQueue没有transientStorePoolEnable机制,数据直接写入FlieChannel,所以这里的writeBuffer为空,writtenPosition的值为taken了,那么ConsumeQueue文件的writtenPosition值在哪里更新了呢?这个可以通过查看MappedFile中修改writePosition方法的appendMessage方法的调用来实现,如下图所示:ConsumeQueue对应的主要入口有两个:ReputMessageService#doReputCommitlog异步转发线程,通过Consumequeue,Index等文件异步构建Commitlog#recoverAbnormallyRocketMQ在启动时会根据Commitlog文件自动恢复Consumequeue文件进入一个文件(Commitlog文件),然后异步转发给ConsumeQueue(消费队列文件)、IndexFile(索引文件)。它的转发服务是通过ReputMessageService实现的。在深入介绍Commitlog文件的转发机制之前,先问大家一个问题:消息是写入内存再转发到ConsumeQueue,还是写入磁盘再转发?为了方便大家对这个问题的探究,代码的核心入口如下图所示:这里的实现要点如下:判断是否转发的关键条件是isCommitlogAvailable()方法返回true和根据转发点reputFromOffset从Commitlog文件中获取消息的物理偏移量、消息大小、标签等信息转发到消息消费队列和索引文件中。isCommitlogAvailable的核心如下:因此,转发的关键在于Commitlog的maxOffset的获取逻辑,其实现时序图如下:这里的核心重点是getReadPosition方法的实现,在RocketMQ中写入Commitlog文件,为了提高写入性能,其中引入了内存级别的读写分离机制。具体实现原理如下图所示:具体在实现层面,如果transientStorePoolEnable=true,则将数据写入堆外内存(writeBuffer),然后提交给FileChannel。位置(由committedPosition表示)。您可以查看分别更改writtenPosition和committedPposition的调用链。writePosition的调用链如下:可以知道writtenPosition会在消息写入内存(pagecache或者堆外内存)时更新,但是一旦开启堆外内存机制,这个值就不会更新了被采取,所以我们可以理解为当一条消息被写入Pagecache时,它可以被转发到消息消费队列。接下来我们看一下committedPosition的调用链,如下图:,然后会有一个异步线程(CommitRealTimeService)定时(默认200ms周期)向FileChannel提交消息,即更新committedPosition的值,并将消息转发到消费队列,让消费者可以消耗。2.2问题原因的提取经过以上分析,问题应该清楚了。为了提高RocketMQ的资源利用率,提高RocketMQ的写入性能,我司开启了transientStorePoolEnable机制。消息发送端写入堆外内存时,会返回写入成功,这样就不会发生MySQLBinlog数据同步。延迟,这里的问题不外乎两个:CommitRealTimeService线程没有及时将堆外内存中的数据提交给FileChannelReputMessageService线程,没有及时转发数据到消费队列,因为不知道对写入底层存储的原理暂时还不够深入,对相关的系统采集指标不够敏感。当时主要分析线程栈,发现ReputMessageService线程一直在工作。推测可能是转发不及时。我还需要更深入地研究这个。如果你对此有真正的了解,欢迎留言,我也会在后续的工作中提升这方面的技能,更深入地理解底层原理。也就是说,我们现在知道了问题的明显原因。虽然底层原理还不清楚,但足以指导我们更好地处理问题:将集群中的消息写入一个大topic,迁移到其他负载较低的集群,从而减轻该集群的写入压力集群,当迁移了几个topic后,不出所料,消息近乎实时的到达了消费队列,集群恢复了。