当前位置: 首页 > 科技观察

RocketMQ编解码技术详解

时间:2023-03-11 23:05:19 科技观察

本文转载自微信公众号《龙漫科技之路》,作者刘力。转载本文请联系漫漫技术路公众号。从上一篇文章我们了解了数据是如何通过网络在RocketMQ的不同组件之间传输的。针对RocketMQ的网络传输模块总结了以下几点。在remoting子模块下,入口可以参考RemotingNettyServer和RemotingNettyClient这两个类。RocketMQ依赖Netty与各个组件进行数据传输。RocketMQ的序列化和反序列化有两种方式:一种是通过FastJSON将数据转成JSON字符串,然后转成byte[]数组进行编解码。另一种方式是RocketMQ自己定义了一套编解码器,对每个字段分别进行编码和解码。无论采用何种编码方式,最终都被编码为byte[]。今天,我们分析一下RocketMQ的编解码细节。在本文中,您可以学习到:RocketMQ网络协议。RocketMQ在传输数据时分配内存。RocketMQ编解码器详细信息。编码流程首先,我们编码器org.apache.rocketmq.remoting.netty.NettyEncoder入手@ChannelHandler.SharablepublicclassNettyEncoderextendsMessageToByteEncoder{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);@Overridepublicvoidencode(ChannelHandlerContextctx,RemotingCommandremotingCommand,ByteBufout)throwsException{try{ByteBufferheader=remotingCommand.encodeHeader();out.writeBytes(header);byte[]body=remotingCommand.getBody();if(body!=null){out.writeBytes(body);}}catch(Exceptione){log.error("encodeexception,"+RemotingHelper.parseChannelRemoteAddr(ctx.channel()),e);if(remotingCommand!=null){log.error(remotingCommand.toString());}RemotingUtil.closeChannel(ctx.channel());}}}从上面的代码来看,是RocketMQ将RemotingCommand对象编码到ByteBuf中的唯一入口。我们可以看到,header部分先被编码到ByteBuf中,然后body部分被附加到ByteBuf中。body部分的编码很好理解,那么header部分是怎么编码的呢?publicByteBufferencodeHeader(){returnencodeHeader(this.body!=null?this.body.length:0);}publicByteBufferencodeHeader(finalintbodyLength){/1>headerlengthsizeintlength=4;//2>headerdatalengthbyte[]headerData;headerData=this.headerEncode();length+=headerData.length;//3>bodydatalengthlength+=bodyLength;ByteBufferresult=ByteBuffer.allocate(4+length-bodyLength);//lengthresult.putInt(length);//headerlengthresult.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));//headerdataresult.put(headerData);result.flip();returnresult;}我们一步步来看,逻辑是下面定义变量length,它的初始值为4,经过几步之后,它的值为4+header.length+body.length。调用headerEnocde()方法对标头部分进行编码。创建一个ByteBuffer,申请4+length-body.length内存,其实就是4+4+header.length内存。将数据写入ByteBuffer,完成header部分的编码。我们画一张图来显示当前内存中存储了哪些数据。我们可以清楚的看到ButyBuf中每一部分的数据存放在什么地方。这里需要强调的是,byte[4]部分存储的是什么,可以通过源码进一步分析。结果.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));publicstaticbyte[]markProtocolType(intsource,SerializeTypetype){byte[]result=newbyte[4];result[0]=type.getCode();result[1]=(byte)((source>>16)&0xFF);result[2]=(byte)((source>>8)&0xFF);result[3]=(byte)(source&0xFF);returnresult;}publicenumSerializeType{JSON((byte)0),ROCKETMQ((byte)1);}markProtocolType方法,第一个参数source表示header部分的长度,第二个参数type是编码类型的枚举,返回值为byte[]。那么markProtocolType方法有什么作用呢?我们将type字段传入JSON和ROCKETMQ这两个枚举值,看看返回的是什么。返回值一共有四个字节,只有第一个字节不同,第四个字节是header部分的长度。上面提到,RocketMQ对于header部分可以使用两种编解码方式。是的!!没错,第一个字节标识编解码器类型。解码过程接下来,让我们看一下解码。解码与编码略有不同。解码器继承Netty提供的LengthFieldBasedFrameDecoder解码器。我们看下org.apache.rocketmq.remoting.netty.NettyDecoder的源码。publicclassNettyDecoderextendsLengthFieldBasedFrameDecoder{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);privatestaticfinalintFRAME_MAX_LENGTH=Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength","16777216"));publicNettyDecoder(){super(FRAME_MAX_LENGTH,0,4,0,4);}@OverridepublicObjectdecode(ChannelHandlerContextctx,ByteBufin)throwsException{ByteBuffer=null;try{frame=(ByteBuf)super.decode(ctx,in);if(null==frame){returnnull;}ByteBufferbyteBuffer=frame.nioBuffer();returnRemotingCommand.decode(byteBuffer);}catch(Exceptione){log.error("decodeexception,"+RemotingHelper.parseChannelRemoteAddr(ctx.channel()),e);RemotingUtil.closeChannel(ctx.channel());}finally{if(null!=frame){frame.release();}}returnnull;}}步骤如下:调用LengthFieldBasedFrameDecoder的decode方法进行初始解码。调用RemotingCommand.decode()方法完成header和body部分的解码,转换为RemotingCommand对象。为什么要解码两次?熟悉LengthFieldBasedFrameDecoder解码器的朋友都知道,LengthFieldBasedFrameDecoder解码器是Netty提供的一种非常灵活的解码器。在RocketMQ的NettyDecoder类中是这样构造的。publicNettyDecoder(){super(FRAME_MAX_LENGTH,0,4,0,4);}LengthFieldBasedFrameDecoder这里就不详细解释了,按照上面的构造方法,就是跳过开头的前4个字节。LengthFieldBasedFrameDecoder构造完成后,第一次对RocketMQ协议进行解码。解码结果如下:我们可以看到前四个字节,也就是内存中存放长度字段的部分被截断了,只剩下byte[]+header+body部分。我们再来看看RemotingCommand的解码通讯publicstaticRemotingCommanddecode(finalByteBufferbyteBuffer){intlength=byteBuffer.limit();intoriHeaderLen=byteBuffer.getInt();intheaderLength=getHeaderLength(oriHeaderLen);byte[]headerData=newbyte[headerLength];byteBuffer.get(headerData);RemotingCommandcmd=headerDecode(headerData,getProtocolType(oriHeaderLen));intbodyLength=length-4-headerLength;byte[]bodyData=null;if(bodyLength>0){bodyData=newbyte[bodyLength];byteBuffer.get(bodyData);}cmd.body=bodyData;returncmd;}privatestaticRemotingCommandheaderDecode(byte[]headerData,SerializeTypetype){switch(type){caseJSON:RemotingCommandresultJson=RemotingSerializable.decode(headerData,RemotingCommand.class);resultJson.setSerializeTypeCurrentRPC(type);returnresultJson;caseROCKETMQ:RemotingCommandresultRMQ=RocketMQSerializable.rocketMQProtocolDecode(headerData);resultRMQ.setSerializeTypeCurrentRPC(type);returnresultRMQ;default:break;}returnnull;}只剩下byte[4]+header+body三个部分,解码发送很方便。求data的长度和header的长度可以分为以下几个步骤。根据编码类型,解码。比如用JSON编码的,就会用JSON解码。计算正文的长度并解码正文。最后解码生成的RemotingCommand对象被送到管道的下一个handler进行处理。