当前位置: 首页 > 科技观察

五张图帮你理解RocketMQ的延迟消息机制_0

时间:2023-03-22 14:51:52 科技观察

大家好,我是君哥,今天我们来聊聊RocketMQ的延迟消息是如何实现的。延迟消息是指消息发送到RocketMQ后,不会立即被消费者拉取,而是要等待一个固定的时间才能被消费者拉取。延迟消息可以用在很多场景中。比如电商场景,超时未支付的订单关闭。在某些场景下,需要在固定时间后发送提醒消息。1.生产者先看官方一个生产者发送延时消息的示例代码:publicstaticvoidmain(String[]args)throwsException{//实例化一个生产者发送定时消息DefaultMQProducerproducer=newDefaultMQProducer("ExampleProducerGroup");//启动生产者producer.start();inttotalMessagesToSend=100;for(inti=0;i0){if(msg.getDelayTimeLevel()>this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()){msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic=TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;intqueueId=ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());//备份真实主题,queueIdMessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_TOPIC);MessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(主题);msg.setQueueId(queueId);}从上面的代码可以看出,并没有直接写CommitLog,而是把Topic改成SCHEDULE_TOPIC_XXXX,把queueId改成延迟级别减1。因为有18个延迟级别,这里有18个队列。如下图所示:2.2调度消息延迟消息写入后,会有一个调度任务不断拉取这些延迟消息。此逻辑在类ScheduleMessageService中。该类的初始化代码如下:publicvoidstart(){if(started.compareAndSet(false,true)){this.load();this.deliverExecutorService=newScheduledThreadPoolExecutor(this.maxDelayLevel,newThreadFactoryImpl("ScheduleMessageTimerThread_"));//省略一些逻辑(Map.Entryentry:this.delayLevelTable.entrySet()){Integerlevel=entry.getKey();LongtimeDelay=entry.getValue();长偏移=这个。offsetTable.get(级别);如果(空==偏移量){偏移量=0L;}if(timeDelay!=null){//省略一些逻辑this.deliverExecutorService.schedule(newDeliverDelayedMessageTimerTask(level,offset),FIRST_DELAY_TIME,TimeUnit.MILLISECONDS);}}//省略持久化逻辑}}上面的load()方法会加载一个delayLevelTable(ConcurrentHashMap类型),key保存延迟级别(从1开始),value保存延迟时间(单位为ms)。load()方法结束后,创建一个18个核心线程的定时线程池,然后遍历delayLevelTable创建18个任务(DeliverDelayedMessageTimerTask),用于每个延迟级别的任务调度。任务调度的代码发布如下:publicvoidexecuteOnTimeup(){ConsumeQueuecq=ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));如果(cq==null){this.scheduleNextTimerTask(this.offset,DELAY_FOR_A_WHILE);返回;}SelectMapppedBufferResultbufferCQ=cq.getIndexBuffer(this.offset);if(bufferCQ==null){//节省部分分发this.scheduleNextTimerTask(resetOffset,DELAY_FOR_A_WHILE);返回;}longnextOffset=this.offset;尝试{inti=0;ConsumeQueueExt.CqExtUnitcqExtUnit=newConsumeQueueExt.CqExtUnit();对于(;i0){this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);返回;}MessageExtmsgExt=ScheduleMessageService.this.defaultMessageStore.lookMes??sageByOffset(offsetPy,sizePy);如果(msgExt==null){继续;}MessageExtBrokerInnermsgInner=ScheduleMessageService.this.messageTimeup(msgExt);//事务消息判定断策booleandeliverSuc;//只保留同步deliverSuc=this.syncDeliver(msgInner,msgExt.getMsgId(),nextOffset,offsetPy,sizePy);如果(!deliverSuc){this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);返回;}}nextOffset=this.offset+(i/ConsumeQueue.CQ_STORE_UNIT_SIZE);}catch(Exceptione){log.error("ScheduleMessageService,messageTimeup执行错误,offset={}",nextOffset,e);}最后{bufferCQ.release();}this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);}这段代码可以参考下面的流程图来理解:上面有一个修正交货时间的功能。这个函数的意思是,如果已经过了投递时间,那么立即投递代码如下:longmaxTimestamp=now+ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);if(deliverTimestamp>maxTimestamp){结果=现在;}returnresult;}注:消息从CommitLog转发到ConsumeQueue时,会判断是否为延迟消息(Topic=SCHEDULE_TOPIC_XXXX且延迟级别大于0)。如果是延迟消息,会修改tagsCode值为消息发送的时间戳,tagsCode原值为标签HashCode。代码如下://CommitLogclasscheckMessageAndReturnSizemethodif(delayLevel>0){tagsCode=this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}如下图所示:-将ConsumeQueue中的消息投递到原队列中,将tagsCode再次改为tag的HashCode,代码如下://classMessageExtBrokerInner,该方法由messageTimeup方法调用。publicstaticlongtagsString2tagsCode(finalTopicFilterTypefilter,finalStringtags){if(null==tags||tags.length()==0){返回0;}returntags.hashCode();}如下图:2.3一个问题如果有一个业务场景需要延迟消息消费3小时,而RocketMQ延迟消息最大延迟级别只支持延迟2小时,如何处理?这里有两个思路供大家参考:修改Broker上messageDelayLevel的默认配置;在客户端缓存msgId,首先设置延迟级别为18(2h),客户端拉取消息时,首先判断是否有缓存,如果有缓存,则再次发送延迟消息,此时延迟级别为17(1h),如果没有缓存,就会被消耗掉。3总结经过上面的解释,延迟消息的处理流程如下:最后,延迟消息的延迟时间不准确。这个时间就是Broker调度线程重新投递消息到原来的MessageQueue的时间。如果RocketMQ客户端出现消息积压或者流量控制,客户端拉取消息后的处理时间可能会超过预设的延迟时间。

最新推荐
猜你喜欢