当前位置: 首页 > 后端技术 > Java

Netty系列:Netty对marshalling的支持

时间:2023-04-01 14:30:18 Java

简介我们在上一篇文章中提到,jbossmarshalling是一种非常好的序列化java对象的方式。它兼容JDK自带的序列化,同时也提供了性能和使用上的优化。那么这么优秀的序列化工具能不能用在netty中作为消息传递的方式呢?答案当然是肯定的,netty一切皆有可能。netty中的marshallingprovider回顾一下jbossmarshalling的常见用法。我们需要从MarshallerFactory创建一个Marshaller。因为mashaller有不同的实现,我们需要指定具体的实现来创建MarshallerFactory,如下:MarshallerFactorymarshallerFactory=Marshalling.getProvidedMarshallerFactory("river");这个MarshallerFactory其实就是一个MarshallerProvider。netty中定义了这样一个接口:publicinterfaceMarshallerProvider{MarshallergetMarshaller(ChannelHandlerContextctx)throwsException;}MarshallerProvider其实做的和MarshallerFactory一样的工作。既然MarshallerProvider是一个接口,那么它的实现是什么?在netty中它有两个实现类,分别是DefaultMarshallerProvider和ThreadLocalMarshallerProvider。两者有什么区别?先来看一下DefaultMarshallerProvider:publicclassDefaultMarshallerProviderimplementsMarshallerProvider{privatefinalMarshallerFactoryfactory;私有最终MarshallingConfiguration配置;publicDefaultMarshallerProvider(MarshallerFactoryfactory,MarshallingConfigurationconfig){this.factory=factory;这个.config=配置;}publicMarshallergetMarshaller(ChannelHandlerContextctx)throwsException{returnfactory.createMarshaller(config);}}顾名思义,DefaultMarshallerProvider是marshallerProvider的默认实现。从具体的实现代码可以看出,DefaultMarshallerProvider其实需要传入MarshallerFactory和MarshallingConfiguration作为参数,然后使用传入的MarshallerFactory来创建具体的marshallerProvider,这与我们手动创建marshaller的方式是一致的。但是在上面的实现中,每次getMarshaller都需要从工厂新建一个,性能上可能会出现问题。所以netty又实现了一个新的ThreadLocalMarshallerProvider:publicclassThreadLocalMarshallerProviderimplementsMarshallerProvider{privatefinalFastThreadLocalmarshallers=newFastThreadLocal();私有最终MarshallerFactory工厂;私有最终MarshallingConfiguration配置;publicThreadLocalMarshallerProvider(MarshallerFactoryfactory,MarshallingConfigurationconfig){this.factory=factory;这个.config=配置;}@OverridepublicMarshallergetMarshaller(ChannelHandlerContextctx)throwsException{Marshallermarshaller=marshallers.get();if(marshaller==null){marshaller=factory.createMarshaller(config);marshallers.set(编组器);}返回编组器;}}ThreadLocalMarshallerProvider和DefaultMarshallerProvider的区别在于ThreadLocalMarshallerProvider中保存了一个FastThreadLocal对象,FastThreadLocal是JDK中ThreadLocal的优化版本,比Thr更好eadLocal更快。在getMarshaller方法中,首先从FastThreadLocal获取Marshaller对象。如果Marshaller对象不存在,则从工厂创建一个Marshaller对象,最后将Marshaller对象放到ThreadLocal中。有MarshallerProvider就有和他对应的UnMarshallerProvider:publicinterfaceUnmarshallerProvider{UnmarshallergetUnmarshaller(ChannelHandlerContextctx)throwsException;}netty中的UnmarshallerProvider有三个实现类,分别是DefaultUnmarshallerProvider,ThreadLocalUnmarshallerProvider和ContextBoundUnmarshallerProvider。前面的两个DefaultUnmarshallerProvider,ThreadLocalUnmarshallerProvider和marshaller最重要的是实现是一样的,这里就不重复解释了。下面主要看一下ContextBoundUnmarshallerProvider的实现。我们从名字就可以看出这个unmarshaller是和ChannelHandlerContext相关的。ChannelHandlerContext表示通道的上下文。它有一个方法叫做attr,可以保存通道相关的属性:Attributeattr(AttributeKeykey);ContextBoundUnmarshallerProvider就是把Unmarshaller存放在context中,每次用到的时候从context中获取,没有获取到的时候从factroy中获取。我们来看下ContextBoundUnmarshallerProvider的实现:publicclassContextBoundUnmarshallerProviderextendsDefaultUnmarshallerProvider{privatestaticfinalAttributeKeyUNMARSHALLER=AttributeKey.valueOf(ContextBoundUnmarshallerProvider.class,"UNMARSHALLER");公共ContextBoundUnmarshallerProvider(MarshallerFactory工厂,MarshallingConfiguration配置){超级(工厂,配置);}@OverridepublicUnmarshallergetUnmarshaller(ChannelHandlerContextctx)throwsException{属性attr=ctx.channel().attr(UNMARSHALLER);解组器unmarshaller=attr.get();if(unmarshaller==null){unmarshaller=super.getUnmarshaller(ctx);属性集(解组器);}返回解组器;}}ContextBoundUnmarshallerProvider继承自DefaultUnmarshallerProvider,在getUnmarshaller方法中先从ctx中取出unmarshaller,如果没有则调用DefaultUnmarshallerProvider中的getUnmarshaller方法取出unmarshallerMarshallingencoder上面章节我们拿到了marshaller。接下来,让我们看看如何使用编组器进行编码和解码操作。我们先来看编码器MarshallingEncoder。MarshallingEncoder继承自MessageToByteEncoder,接收的泛型类型为Object:publicclassMarshallingEncoderextendsMessageToByteEncoder是将Object对象编码成ByteBuf。回想一下我们之前提到的普通对象的编码需要一个对象长度字段来划分对象的数据。同样的MarshallingEncoder也提供了一个4字节的LENGTH_PLACEHOLDER来存储对象的长度。具体看它的encode方法:intlengthPos=out.writerIndex();out.writeBytes(LENGTH_PLACEHOLDER);ChannelBufferByteOutput输出=newChannelBufferByteOutput(out);marshaller.start(输出);marshaller.writeObject(味精);marshaller.finish();marshaller.close();out.setInt(lengthPos,out.writerIndex()-lengthPos-4);}encode的逻辑很简单,先从provider获取marshaller对象,然后先写入4个字节的LENGTH_PLACEHOLDER到out,再用marshaller将编码后的对象写入out,最后根据写入的对象的长度填写得到最后的输出。因为编码后的数据存储的是长度数据,所以在解码的时候需要一个叫做LengthFieldBasedFrameDecoder的帧解码器。LengthFieldBasedFrameDecoder通常有两种使用方式。一种是将LengthFieldBasedFrameDecoder添加到管道处理程序。解码器只需要处理帧解码器处理过的对象。另一种方式是解码器本身是一个LengthFieldBasedFrameDecoder。这里netty选择了第二种方式。我们看一下MarshallingDecoder的定义:publicclassMarshallingDecoderextendsLengthFieldBasedFrameDecoder首先需要在构造函数中指定LengthFieldBasedFrameDecoder的字段长度,这里调用super方法来实现:publicMarshallingDecoder(UnmarshallerProviderprovider,intmaxObjectSize){super(maxObjectSize,0,4,0,4);this.provider=提供者;}并覆盖extractFrame方法:protectedByteBufextractFrame(ChannelHandlerContextctx,ByteBufbuffer,intindex,intlength){}最后看decode方法:如果(框架==空){返回空;}Unmarshallerunmarshaller=provider.getUnmarshaller(ctx);ByteInput输入=newChannelBufferByteInput(frame);尝试{unmarshaller.start(input);对象obj=unmarshaller.readObject();unmarshaller.finish();返回对象;}最后{unmarshaller.close();解码的逻辑也很简单,先调用super方法解码frameByteBuf,然后调用unmarshaller读取对象,最后返回变化后的对象。Marshallingencoding的另一种实现上面我们提到了对象的编码使用了LengthFieldBasedFrameDecoder,它根据对象实际数据之前的一个长度字段来确定字段的长度,从而读取到真正的数据。那么是否可以在不指定物体长度的情况下准确读取物体呢?其实也是可以的,我们可以一直尝试读取数据,直到找到合适的对象数据。看过我之前文章的朋友可能想到了,ReplayingDecoder不就是干这个的吗?在ReplayingDecoder中,它会不断重试,直到找到满足条件的消息。所以netty也有基于ReplayingDecoder的编组编解码的实现,分别叫做CompatibleMarshallingEncoder和CompatibleMarshallingDecoder。CompatibleMarshallingEncoder很简单,因为不需要对象的实际长度,所以直接使用编组编码即可。公共类CompatibleMarshallingEncoderextendsMessageToByteEncoder{privatefinalMarshallerProviderprovider;publicCompatibleMarshallingEncoder(MarshallerProviderprovider){this.provider=provider;}@Overrideprotectedvoidencode(ChannelHandlerContextctx,Objectmsg,ByteBufout)throwsException{Marshallermarshaller=provider.getMarshaller(ctx);marshaller.start(新的ChannelBufferByteOutput(out));marshaller.writeObject(味精);marshaller.finish();marshaller.close();}}CompatibleMarshallingDecoder继承了ReplayingDecoder:publicclassCompatibleMarshallingDecoderextendsReplayingDecoder它的解码方法的核心就是调用unmarshaller的方法:Unmarshallerunmarshaller=provider.getUnmarshaller(ctx);ByteInputinput=newChannelBufferByteInput(buffer);if(maxObjectSize!=Integer.MAX_VALUE){input=newLimitingByteInput(input,maxObjectSize);}尝试{unmarshaller.start(input);对象obj=unmarshaller.readObject();unmarshaller.finish();添加(对象);}catch(LimitingByteInput.TooBigObjectException被忽略){discardingTooLongFrame=true;抛出新的TooLongFrameException();}最后{unmarshaller.close();注意这里解码的时候有两种异常。第一个异常是unmarshaller.readObject时的异常,会被ReplayingDecoder捕获重试。另一种是字段Toolong异常,这种异常无法处理,只能放弃:}else{super.exceptionCaught(ctx,原因);}}总结以上就是netty中使用marshalling编解码的实现。原理和对象编码解码很相似,大家可以对比分析。本文已收录于http://www.flydean.com/17-1-netty-marshalling/最流行的解读,最深刻的干货,最简洁的教程,很多你不知道的小技巧等着你等你发现!欢迎关注我的公众号:《程序那些事儿》,懂技术,更懂你!