当前位置: 首页 > 科技观察

为了弥补延迟消息的不足,RocketMQ基于时间轮算法实现了定时消息!

时间:2023-03-17 14:07:29 科技观察

大家好,我是君哥,在RocketMQ4.x版本中,使用延迟消息来实现消息的定时消费。延迟消息在一定程度上可以定时发送,但有一定的局限性。新版RocketMQ引入了基于时间轮算法的定时消息。目前,通过精准到秒级的定时消息实现的pr已经提交给社区。今天给大家介绍一下。1延迟消息1.1介绍RocketMQ的延迟消息是指Producer发送消息后,Consumer不会立即消费,而是需要等待一定的时间再消费。在某些场景下,延迟消息是有用的,比如在电商场景下关闭30分钟内未支付的订单。使用延迟消息非常简单,只需要给消息的delayTimeLevel属性赋值即可。参考如下代码:Messagemessage=newMessage("TestTopic",("Helloscheduledmessage"+i).getBytes());//第三级,10smessage.setDelayTimeLevel(3);producer.send(message);延迟消息有18级,如下:privateStringmessageDelayLevel="1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h";发送给Broker后,Broker判断为延迟消息,会先将消息投递到延迟队列(Topic=SCHEDULE_TOPIC_XXXX,queueId=delayTimeLevel-1)。定时任务线程池中会有18个线程来调度延迟队列,每个线程调度一个延迟级别,调度任务会将延迟消息投递到原队列中,以便Consumer拉取。1.3InsufficientDelaymessages有一些缺点:1.延迟级别只有18个,不能满足所有场景;2.如果修改messageDelayLevel配置自定义延迟级别,不灵活,比如大型平台3.延迟时间不准确,后台的定时线程可能会因为大量处理消息。2定时消息为了弥补延迟消息的不足,RocketMQ5.0引入了定时消息。2.1时间轮算法为了解决在定时任务队列中遍历任务带来的性能开销,RocketMQ定时消息引入了秒级时间轮算法。如下图所示:图中是一个60s的时间轮,时间轮上会有一个指针指向当前时间,定时移动到下一个时间(秒级)。时间轮算法的优点是不需要遍历所有任务。每个时间节点上的任务都链接在一个链表中。当时间轮上的指针移动到当前时间时,将执行该时间节点上的所有任务。上面虽然只是一个60s的时间轮,但是支持所有的时间延迟。可以在每个时间节点添加一个round字段,用来记录时间轮转过的圈数。比如延迟130s的任务,round为2,放在第10个时间尺度的链表中。这样,当时间轮到一个节点,在该节点上执行任务时,首先判断轮次是否等于0,如果等于0,则从任务列表中移除该任务,交给异步线程进行执行,否则轮次减1,继续检查后面的Task。2.2使用基于时间轮算法的思想,RocketMQ实现了精准的定时消息。使用RocketMQ定时消息时,客户端自定义消息示例代码如下:(Resource.newBuilder().setName(TOPIC).build()).setSystemProperties(SystemProperties.newBuilder().setMessageId(msgId).setQueueId(0).setMessageType(MessageType.DELAY).setDeliveryTimestamp(Timestamps.fromMillis(deliveryTime))//定义消息传递时间.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())).setBornHost(StringUtils.defaultString(RemotingUtil.getLocalAddress(),"127.0.0.1:1234")).build()).setBody(ByteString.copyFromUtf8("123")).build()),Resource.newBuilder().setName(TOPIC).build()).get(0);2.3实现原理2.3.1消息传递在上面的代码结构中,Producer创建发送消息时,传递一个系统属性deliveryTimestamp给消息。该属性指定了消息的传递时间,封装在消息的TIMER_DELIVER_MS属性中。代码如下:protectedvoidfillDelayMessageProperty(apache.rocketmq.v2.Messagemessage,org.apache.rocketmq.common.message.MessagemessageWithHeader){if(message.getSystemProperties().hasDeliveryTimestamp()){时间戳deliveryTimestamp=message.getSystemProperties().getDeliveryTimestamp();//delayTime这个延迟时间默认不能超过1天,可以配置longdeliveryTimestampMs=Timestamps.toMillis(deliveryTimestamp);validateDelayTime(deliveryTimestampMs);//...StringtimestampString=String.valueOf(deliveryTimestampMs);//MessageConst.PROPERTY_TIMER_DELIVER_MS="TIMER_DELIVER_MS"MessageAccessor.putProperty(messageWithHeader,MessageConst.PROPERTY_TIMER_DELIVER_MS,timestampString);Broker收到这条消息后,如果判断属性TIMER_DELIVER_MS有值,则将消息投递到Topic为rmq_sys_wheel_timer,queueId为0,Topic、queueId、投递时间(TIMER_OUT_MS)的队列中原始消息将同时保存。TimerMessageStore中有一个定时任务TimerEnqueueGetService,它从主题rmq_sys_wheel_timer中读取消息,然后封装TimerRequest请求放入队列enqueuePutQueue中。2.3.2绑定时间轮RocketMQ使用TimerLog保存消息的原始数据,绑定到时间轮上。首先看TimerLog保存的数据结构,如下图:参考如下代码://TimerMessageStoreclassByteBuffertmpBuffer=timerLogBuffer;tmpBuffer.clear();tmpBuffer.putInt(TimerLog.UNIT_SIZE);//sizetmpBuffer.putLong(slot.lastPos);//prevpostmpBuffer.putInt(magic);//magictmpBuffer.putLong(tmpWriteTimeMs);//currWriteTimetmpBuffer.putInt((int)(delayedTime-tmpWriteTimeMs));//delayTimetmpBuffer.putLong(offsetPy);//offsettmpBuffer.putInt(sizePy);//sizetmpBuffer.putInt(hashTopicForMetrics(realTopic));//真实主题的hashcodempBuffer.putLong(0);//reservedvalue,nowlongret=timerLog.append(tmpBuffer.array(),0,TimerLog.UNIT_SIZE);if(-1!=ret){//如果是删除消息,那么slot的totalnum-1//TODO:检查删除消息是否与“要删除的消息”在同一个槽中。timerWheel.putSlot(delayedTime,slot.firstPos==-1?ret:slot.firstPos,ret,isDelete?slot.num-1:slot.num+1,slot.magic);}TimerEnqueuePutServiceenqueuePutQueue上面(第2.3.1节))取出TimerRequest,打包成TimerLog。TimerLog的时间轮是如何关联的?RocketMQ使用TimerWheel来描述时间轮。TimerWheel中的每个时间节点都是一个Slot,Slot中保存着本次延迟时间的TimerLog信息。数据结构如下图所示:参考如下代码:timeMs)*Slot.SIZE);localBuffer.get().putLong(timeMs/precisionMs);localBuffer.get().putLong(firstPos);localBuffer.get().putLong(lastPos);localBuffer.get().putInt(num);localBuffer.get().putInt(magic);}这样时间轮就和TimerLog关联起来了,如下图:如果有新的消息到达时间轮的某个时间节点(Slot),就新建一个TimerLog,并把它的指针指向这个时间节点的最后一个TimerLog,然后Slot的lastPos属性指向新建的TimerLog,如下图所示:从源码来看,RocketMQ定义了一个7-以秒为单位的时间轮。2.3.3时间轮转动时间轮转动时,定时器任务TimerDequeueGetService从当前时间节点(Slot)对应的TimerLog中取出数据,封装成TimerRequest,放入dequeueGetQueue队列中。2.3.4CommitLog中读取消息的定时任务TimerDequeueGetMessageService从队列dequeueGetQueue中拉取TimerRequest请求,然后根据TimerRequest中的参数在CommitLog(MessageExt)中查找消息,找到后将消息封装到TimerRequest中,然后将TimerRequest写入dequeuePutQueue队列。2.3.5编写原队列的定时任务TimerDequeuePutMessageService从dequeuePutQueue队列中获取消息,将消息转换为原消息,放入原队列,让消费者拉取。3总结RocketMQ4.x版本只支持延迟消息,有一定的局限性。新版RocketMQ引入了定时消息,弥补了延迟消息的不足。定时消息的处理流程如下:可以看到,RocketMQ的定时消息的实现还是有一定的复杂度的。这里使用了5个定时任务和3个队列来实现。最后,对于定时时间的定义,client、broker和timewheel默认的最大延迟时间定义是不同的,使用时需要注意。