当前位置: 首页 > 科技观察

五张图带你吃透RocketMQ轨迹信息

时间:2023-03-16 13:53:49 科技观察

大家好,我是君哥,为了方便追踪消息发送和消费的轨迹,RocketMQ引入了轨迹消息,今天我们一起学习。1、开启消息追踪RocketMQ默认是不开启消息追踪的,需要我们手动开启。1.1Broker在Broker端开启trace消息,需要添加如下配置:traceTopicEnable=true1.2Producer对于producer端,开启trace消息,需要在定义producer时添加参数。定义使用者以使用DefaultMQProducer类。该类支持开启track消息的构造函数如下:publicDefaultMQProducer(finalStringproducerGroup,RPCHookrpcHook,booleanenableMsgTrace,finalStringcustomizedTraceTopic)publicDefaultMQProducer(finalStringproducerGroup,booleanQProfincEnableMsgStringDefaulter)pproducerGroup,booleanenableMsgTrace,finalStringcustomizedTraceTopic)publicDefaultMQProducer(finalStringnamespace,finalStringproducerGroup,RPCHookrpcHook,booleanenableMsgTrace,finalStringcustomizedTraceTopic)从上面的构造函数可以看出,在自定义消费者的时候,不仅可以定义开启traceMessage,还可以指定Topic来跟踪消息被发送。如果不指定跟踪消息的Topic,默认发送的Topic为RMQ_SYS_TRACE_TOPIC。1.3Consumers对于消费者来说,为了能够跟踪消息,需要在定义消费者的时候加上参数。定义消费者以使用DefaultMQPushConsumer类。该类的构造函数支持开启跟踪消息,如下:AllocateMessageQueueStrategyallocateMessageQueueStrategy,booleanenableMsgTrace,finalStringcustomizedTraceTopic)publicDefaultMQPushConsumer(finalStringnamespace,finalStringconsumerGroup,RPCHookrpcHook,AllocateMessageQueueStrategyallocateMessageQueueStrategy,booleanenableMsgTrace,finalStringcustomizedTraceTopic)2.生产者处理首先看一个支持轨迹消息的生产者Example:DefaultMQProducerproducer=newDefaultMQProducer(producerGroupTemp,true,"");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();下面是生产者端的UML类图:在创建DefaultMQProducer时,会初始化defaultMQProducerImpl、traceDispatcher和钩子函数SendMessageHook。2.1Producer初始化Producer初始化代码如下:this.producerGroup=producerGroup;defaultMQProducerImpl,rpcHook);if(enableMsgTrace){try{AsyncTraceDispatcherdispatcher=newAsyncTraceDispatcher(producerGroup,TraceDispatcher.Type.PRODUCE,customizedTraceTopic,rpcHook);dispatcher.setHostProducer(this.defaultdisMQProducerImpl);tracksub-spatcher注册函数=tracethis.defaultMQProducerImpl.registerSendMessageHook(newSendMessageTraceHookImpl(traceDispatcher));//省略事务消息钩子注册}catch(Throwablee){}}}在初始化代码中,在消息的Topic(customizedTraceTopic)中传入是否开启trace消息(enableMsgTrace)和自定义trace,同时初始化traceDispatcher,注册钩子函数SendMessageTraceHook。defaultMQProducerImpl和traceDispatcher也会在producer启动时启动,代码如下:publicvoidstart()throwsMQClientException{this.setProducerGroup(withNamespace(this.producerGroup));this.defaultMQProducerImpl.start();if(null!=traceDispatcher){try{traceDispatcher.start(this.getNamesrvAddr(),this.getAccessChannel());}catch(MQClientExceptione){log.warn("tracedispatcherstartfailed",e);}}}2.2traceDispatcherstartsproducerinitialization初始化traceDispatcher的时候。traceDispatcher是trace消息的处理器,AsyncTraceDispatcher的构造函数定义了一个producertraceProducer(DefaultMQProducer类型)专门发送trace消息。注意:traceProducer发送的消息最大maxMessageSize为128k。虽然定义了maxMessageSize的初始值是4M,但是在创建traceProducer的时候给它赋值128k。如上所述,traceDispatcher也会在生产者启动时启动。看一下它的启动方法:publicvoidstart(StringnameSrvAddr,AccessChannelaccessChannel)throwsMQClientException{if(isStarted.compareAndSet(false,true)){traceProducer.setNamesrvAddr(nameSrvAddr);traceProducer.setInstanceName(TRACE_INSTANCE_NAME+"_"+nameSrvAddr);traceProducer.start();}this.accessChannel=accessChannel;this.worker=newThread(newAsyncRunnable(),"MQ-AsyncTraceDispatcher-Thread-"+dispatcherId);this.worker.setDaemon(true);这个.worker.start();this.registerShutDownHook();}可以看到traceDispatcher的启动是先启动traceProducer,然后再启动一个异步线程AsyncRunnable,我们看一下run方法:publicvoidrun(){while(!stopped){//batchSize=100Listcontexts=newArrayList(batchSize);//traceContextQueue队列长度等于1024synchronized(traceContextQueue){for(inti=0;i0){AsyncAppenderRequestrequest=newAsyncAppenderRequest(contexts);traceExecutor.提交(请求);}elseif(AsyncTraceDispatcher.this.stopped){this.stopped=true;}}}}从上面代码可以看出,每次从traceContextQueue中拉取100个TraceContext,然后通过AsyncAppenderRequest异步发送出去注意:发送trace消息时,需要组装消息进行批量发送,每次发送的大小消息不应超过128k;如果有多个保存trace消息的Broker,需要按照轮询的方式依次发送给不同的Broker。具体代码参见AsyncTraceDispatcher类中的sendTraceDataByMQ方法。2.3Hook函数看到这里,相信你一定有一个疑问,traceContextQueue中的消息是从哪里来的呢?答案是初始化生产者时定义的SendMessageTraceHook。看一下发送消息的代码://DefaultMQProducerImpl类privateSendResultsendKernelImpl(finalMessagemsg,finalMessageQueuemq,finalCommunicationModecommunicationMode,finalSendCallbacksendCallback,finalTopicPublishInfotopicPublishInfo,finallongtimeout)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{//省略部分代码SendMessageContextcontext=null;if(brokerAddr!=null){try{//省略一些代码if(this.hasSendMessageHook()){context=newSendMessageContext();//1。发送消息前执行钩子函数this.executeSendMessageHookBefore(context);}SendMessageRequestHeaderrequestHeader=newSendMessageRequestHeader();//省略requestHeader封装代码SendResultsendResult=null;//------------2。在这里发送消息------------if(this.hasSendMessageHook()){context.setSendResult(sendResult);//3。发送消息后执行钩子函数this.executeSendMessageHookAfter(context);}返回发送结果;}//catch最后省略}thrownewMQClientExceptionon("Thebroker["+mq.getBrokerName()+"]notexist",null);}因为sendKernelImpl代码较多,这里只贴出骨架代码。我在上面加了注释,可以看到在发送消息的时候前后都会执行钩子函数。在发送消息之前,通过调用钩子函数封装一条trace消息。消息发送后,通过hook函数完善trace消息,主要是添加消息发送结果、发送消息耗时等属性,然后将trace消息添加到traceContextQueue中。track消息包含的内容如下:track消息有很多内容,包括发送消息的详细信息,如:Topic、Message、MessageQueue、Group、生产者地址(clientHost)、消息发送结果、etc.3.Broker处理trace消息发送给Broker后,会保存在Broker上。默认保存的Topic是RMQ_SYS_TRACE_TOPIC。Broker启动时,会自动初始化默认Topic的路由配置,代码如下:().getMsgTraceTopicName();TopicConfigtopicConfig=newTopicConfig(主题);TopicValidator.addSystemTopic(主题);topicConfig.setReadQueueNums(1);topicConfig.setWriteQueueNums(1);this.topicConfigTable.put(topicConfig.getTopicName(),topicConfig);}前面提到,Producers也可以定义自己的跟踪消息Topic,但是需要提前在Broker上创建一个自定义的Topic。如果要将跟踪消息和业务消息隔离开来,可以使用专门的Broker来保存跟踪消息,所以需要在这个Broker上单独开启跟踪消息。4.消费端处理消费端对track消息的处理与生产端非常相似。首先我们看一下消费者处理的UML类图:下面以push模式为例,处理并发消息。ConsumeMessageConcurrentlyService在消费消息前通过DefaultMQPushConsumerImpl调用钩子函数executeHookBefore,在消费消息后通过DefaultMQPushConsumerImpl调用钩子函数executeHookAfter。代码如下://ConsumeMessageConcurrentlyService类publicvoidrun(){//省战略部分分发ConsumeMessageContextconsumeMessageContext=null;如果(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()){consumeMessageContext=newConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(newHashMap());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(消息);consumeMessageContextalse)Success(f;//1.执行钩子函数ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}//省略部分逻辑try{//2.消费消息status=listener.consumeMessage(Collections.unmodifiableList(msgs),context);}catch(Throwablee){}//省略一些逻辑if(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()){consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS==status);//3。执行钩子函数ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeContext);MessageContext//省略部分逻辑}如果consumer端开启trace消息,会初始化traceDispatcher并注册钩子函数if(enableMsgTrace){try{AsyncTraceDispatcherdispatcher=newAsyncTraceDispatcher(consumerGroup,TraceDispatcher.Type.CONSUME,customizedTraceTopic,rpcHook);dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());traceDispatcher=调度员;this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(newConsumeMessageTraceHookImpl(traceDispatcher));}catch(Throwablee){log.error("systemmqtracehookinitcan'tfail,maymsgtracedata");}}可以看到traceDispatcher和producer都使用了AsyncTraceDispatcher,处理逻辑完全一样。同样,钩子函数的使用和生产者类似。消息消费前调用钩子函数(executeHookBefore)封装跟踪消息,消息消费后再次调用钩子函数(executeHookAfter)完成跟踪消息。消费端track消息的内容如下图所示:5小结本文主要讲解RocketMQ的track消息的实现机制。跟踪消息分为生产端跟踪消息和消费端跟踪消息。生产端和消费端RocketMQ都提供了构造函数来指定是否开启trace消息。通过hook函数,将trace消息添加到队列中,即变量traceContextQueue,traceDispatcher以100为单位不断从队列中拉取消息组装发送给Broker。如下图所示:了解了traceDispatcher和hook函数之后,就很容易理解RocketMQ跟踪消息的处理逻辑了。在Broker端,通过添加配置参数traceTopicEnable指定是否存储trace消息。