RocketMQ简介RocketMQ是一个开源分布式消息系统,基于高可用分布式集群技术,提供低延迟、高可靠、万亿级容量、灵活可扩展的消息发布和订阅服务。它的前身是MetaQ,是阿里巴巴基于Kafka的设计,使用Java自主开发的。2012年阿里开源,2016年阿里捐赠给Apache软件基金会(简称ASF),正式成为孵化项目。2017年,Apache软件基金会宣布RocketMQ作为Apache顶级项目(简称TLP)孵化,这是国内首个基于Apache的互联网中间件顶级项目。延迟消息生产者将消息发送到消息队列后,并不期望立即被消费,而是等待指定的时间才能被消费者消费。这种类型的消息通常称为延迟消息。RocketMQ支持延迟消息,但不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,就无法避免Broker层面的消息排序,而当涉及到持久化的考虑时,那么消息排序必然会产生巨大的性能开销。消息延迟级别为1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h,共18级。发送消息时,只需设置消息延迟级别即可。设置消息延迟级别有以下三种情况:当消息延迟级别设置为0时,该消息为非延迟消息。当消息延迟级别设置为大于等于1且小于等于18时,消息将延迟特定时间。例如消息延迟级别设置为1,则延迟1s;如果消息延迟级别设置为2,则延迟为5s,依此类推。当消息延迟级别设置大于18时,消息延迟级别为18。例如,如果消息延迟级别设置为20,则延迟为2h。延迟消息示例首先,编写一个消费延迟消息的消费者://实例化消费者DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("OneMoreGroup");//设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");//订阅一个或多个Topic和Tags,过滤需要消费的内容Messageconsumer.subscribe("OneMoreTopic","*");//注册回调实现类,处理从broker拉回的消息sdf.format(newDate()),Thread.currentThread().getName());for(MessageExtmsg:msgs){System.out.printf("\tMsgId:%s%n",msg.getMsgId());System.out.printf("\tBody:%s%n",newString(msg.getBody()));}//标记消息已经消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});//启动消费者实例consumer.start();System.out.println("ConsumerStarted.");}}编写另一个延迟消息生产者来发送延迟消息:mm:ss.SSS");//实例化消息生产者ProducerDefaultMQProducerproducer=newDefaultMQProducer("OneMoreGroup");//设置NameServer的地址producer.setNamesrvAddr("localhost:9876");//启动Producerinstanceproducer.start();Messagemsg=newMessage("OneMoreTopic","DelayMessage","Thisisadelaymessage.".getBytes());//"1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h"//设置消息延迟级别为3,即延迟10smsg.setDelayTimeLevel(3);//向Broker发送消息SendResultsendResult=producer.send(msg);//通过sendResult返回消息是否发送成功System.out.printf("%sSendStatus:%s,MsgId:%s%n",sdf.format(newDate()),sendResult.getSendStatus(),发送结果.getMsgId());//如果不再发送消息,则关闭Producer实例。生产者.shutdown();}}运行producer后,会发送一条延时消息:10:37:14.992SendStatus:SEND_OK,MsgId:C0A8006D5AB018B4AAC216E0DB69000010秒后,consumer收到延时消息:10:37:25.026ConsumeMessageThread_1ReceiveNewMessages:消息ID:C0A8006D5AB018B4AAC216E0DB690000正文:这是一条延迟消息。延迟消息原理分析下面分析的RocketMQ源码版本号为4.7.1,不同版本源码略有不同。CommitLog在org.apache.rocketmq.store.CommitLog中对延迟消息做了一些处理://如果延迟级别大于0,则为延迟消息if(msg.getDelayTimeLevel()>0){//判断当前延迟级别,如果大于最大延迟级别,//将当前延迟级别设置为最大延迟级别。如果(msg.getDelayTimeLevel()>this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()){msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}//获取延迟消息的主题,//其中RMQ_SYS_SCHEDULE_TOPIC的值为SCHEDULE_TOPIC_XXXXtopic=TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;//根据延迟级别获取延迟消息的队列ID,//队列ID其实就是延迟级别减1queueId=ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());//备份trueTopic和QueueIdMessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_TOPIC,msg.getTopic());MessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//设置延迟消息的主题和队列Idmsg.setTopic(topic);msg.setQueueId(queueId);}可以看到每条延迟消息的topic临时改成了SCHEDULE_TOPIC_XXXX,新的queueId是根据延迟消息的延迟级别改变的。接下来,org.apache.rocketmq.store.schedule.ScheduleMessageService处理延迟消息。ScheduleMessageServiceScheduleMessageService由org.apache.rocketmq.store.DefaultMessageStore初始化,初始化包括构造对象和调用load方法。最后执行ScheduleMessageService的start方法:publicvoidstart(){//使用AtomicBoolean保证start方法只有效一次if(started.compareAndSet(false,true)){this.timer=newTimer("ScheduleMessageTimerThread“,真的);//遍历所有延迟级别for(Map.Entry
