当前位置: 首页 > 后端技术 > Java

RocketMQ延迟消息代码实战及原理分析

时间:2023-04-02 02:06:25 Java

RocketMQ简介RocketMQ是一个开源分布式消息系统,基于高可用分布式集群技术,提供低延迟、高可靠、万亿级容量、灵活可扩展的消息发布和订阅服务。它的前身是MetaQ,是阿里巴巴基于Kafka的设计,使用Java自主开发的。2012年阿里开源,2016年阿里捐赠给Apache软件基金会(简称ASF),正式成为孵化项目。2017年,Apache软件基金会宣布RocketMQ作为Apache顶级项目(简称TLP)孵化,这是国内首个基于Apache的互联网中间件顶级项目。延迟消息生产者将消息发送到消息队列后,并不期望立即被消费,而是等待指定的时间才能被消费者消费。这种类型的消息通常称为延迟消息。RocketMQ支持延迟消息,但不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,就无法避免Broker层面的消息排序,而当涉及到持久化的考虑时,那么消息排序必然会产生巨大的性能开销。消息延迟级别为1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h,共18级。发送消息时,只需设置消息延迟级别即可。设置消息延迟级别有以下三种情况:当消息延迟级别设置为0时,该消息为非延迟消息。当消息延迟级别设置为大于等于1且小于等于18时,消息将延迟特定时间。例如消息延迟级别设置为1,则延迟1s;如果消息延迟级别设置为2,则延迟为5s,依此类推。当消息延迟级别设置大于18时,消息延迟级别为18。例如,如果消息延迟级别设置为20,则延迟为2h。延迟消息示例首先,编写一个消费延迟消息的消费者://实例化消费者DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("OneMoreGroup");//设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");//订阅一个或多个Topic和Tags,过滤需要消费的内容Messageconsumer.subscribe("OneMoreTopic","*");//注册回调实现类,处理从broker拉回的消息sdf.format(newDate()),Thread.currentThread().getName());for(MessageExtmsg:msgs){System.out.printf("\tMsgId:%s%n",msg.getMsgId());System.out.printf("\tBody:%s%n",newString(msg.getBody()));}//标记消息已经消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});//启动消费者实例consumer.start();System.out.println("ConsumerStarted.");}}编写另一个延迟消息生产者来发送延迟消息:mm:ss.SSS");//实例化消息生产者ProducerDefaultMQProducerproducer=newDefaultMQProducer("OneMoreGroup");//设置NameServer的地址producer.setNamesrvAddr("localhost:9876");//启动Producerinstanceproducer.start();Messagemsg=newMessage("OneMoreTopic","DelayMessage","Thisisadelaymessage.".getBytes());//"1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h"//设置消息延迟级别为3,即延迟10smsg.setDelayTimeLevel(3);//向Broker发送消息SendResultsendResult=producer.send(msg);//通过sendResult返回消息是否发送成功System.out.printf("%sSendStatus:%s,MsgId:%s%n",sdf.format(newDate()),sendResult.getSendStatus(),发送结果.getMsgId());//如果不再发送消息,则关闭Producer实例。生产者.shutdown();}}运行producer后,会发送一条延时消息:10:37:14.992SendStatus:SEND_OK,MsgId:C0A8006D5AB018B4AAC216E0DB69000010秒后,consumer收到延时消息:10:37:25.026ConsumeMessageThread_1ReceiveNewMessages:消息ID:C0A8006D5AB018B4AAC216E0DB690000正文:这是一条延迟消息。延迟消息原理分析下面分析的RocketMQ源码版本号为4.7.1,不同版本源码略有不同。CommitLog在org.apache.rocketmq.store.CommitLog中对延迟消息做了一些处理://如果延迟级别大于0,则为延迟消息if(msg.getDelayTimeLevel()>0){//判断当前延迟级别,如果大于最大延迟级别,//将当前延迟级别设置为最大延迟级别。如果(msg.getDelayTimeLevel()>this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()){msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}//获取延迟消息的主题,//其中RMQ_SYS_SCHEDULE_TOPIC的值为SCHEDULE_TOPIC_XXXXtopic=TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;//根据延迟级别获取延迟消息的队列ID,//队列ID其实就是延迟级别减1queueId=ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());//备份trueTopic和QueueIdMessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_TOPIC,msg.getTopic());MessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//设置延迟消息的主题和队列Idmsg.setTopic(topic);msg.setQueueId(queueId);}可以看到每条延迟消息的topic临时改成了SCHEDULE_TOPIC_XXXX,新的queueId是根据延迟消息的延迟级别改变的。接下来,org.apache.rocketmq.store.schedule.ScheduleMessageService处理延迟消息。ScheduleMessageServiceScheduleMessageService由org.apache.rocketmq.store.DefaultMessageStore初始化,初始化包括构造对象和调用load方法。最后执行ScheduleMessageService的start方法:publicvoidstart(){//使用AtomicBoolean保证start方法只有效一次if(started.compareAndSet(false,true)){this.timer=newTimer("ScheduleMessageTimerThread“,真的);//遍历所有延迟级别for(Map.Entryentry:this.delayLevelTable.entrySet()){//key是延迟级别Integerlevel=entry.getKey();//value为delaylevel对应的毫秒数LongtimeDelay=entry.getValue();//根据延迟级别获取对应队列的偏移量Longoffset=this.offsetTable.get(level);//如果偏移量为空,则将其设置为0if(null==offset){offset=0L;}if(timeDelay!=null){//为每个延迟级别创建一个定时任务,//第一次启动任务的延迟为FIRST_DELAY_TIME,即1秒this.timer.schedule(newDeliverDelayedMessageTimerTask(level,偏移量),FIRST_DELAY_TIME);}}//延迟10秒后,每flushDelayOffsetInterval执行一个任务,//其中flushDelayOffsetInterval默认配置也是10秒this.timer.scheduleAtFixedRate(newTimerTask(){@Overridepublicvoidrun(){try{//持久化每个队列消耗的offsetif(started.get())ScheduleMessageService.this.persist();}catch(Throwablee){log.error("scheduleAtFixedRate刷新异常",e);}}},10000,this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}遍历所有延迟级别,根据延迟级别得到对应队列的偏移量,如果偏移量不存在则设置为0然后为每个延迟级别创建一个定时任务,第一个启动任务延迟1秒,第二次及后续启动任务的延迟为延迟级别对应的延迟时间。然后,创建了一个定时任务来持久化每个队列消耗的offset。持久化频率由flushDelayOffsetInterval属性配置,默认为10秒。执行完定时任务ScheduleMessageService的start方法后,每个延迟级别都会创建自己的定时任务。这里定时任务的具体实现在DeliverDelayedMessageTimerTask类中。它的核心代码是executeOnTimeup方法。我们来看主要部分://根据主题和队列Id获取消息队列ConsumeQueuecq=ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));如果没有获取到对应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后重新执行任务。如果获取到,则继续执行以下操作://根据消费偏移量,从消息队列中获取所有有效消息。SelectMapppedBufferResultbufferCQ=cq.getIndexBuffer(this.offset);执行任务前100)毫秒。如果获取到,则继续执行以下操作://遍历所有消息for(;i

猜你喜欢