前面几节我们实现了最基本的客户端调用服务端。在本节中,我们将了解通信中的对象序列化。为什么要序列化netty底层是基于ByteBuf进行通信的。前面我们使用encoder/decoder来处理计算的输入/输出参数,方便我们直接使用pojo。但有一个问题。如果我们想将我们的项目抽象成一个框架,我们需要为所有对象编写编码器/解码器。显然,直接通过每个对象写一个pair是不现实的,用户如何使用也是未知的。序列化方式基于字节实现,性能好但可读性不高。基于字符串的实现,比如json序列化,可读性好,性能相对较差。ps:大家可以根据个人喜好选择。相关连载请参考下文,此处不再展开。json序列化框架介绍[1]实现思路可以把我们所有的Pojo都转成byte,再把Byte转成ByteBuf。反之亦然。代码实现maven导入序列化包:com.github.houbbjson0.1.1server-sidecoreserver-side代码可以大大简化:serverBootstrap.group(workerGroup,bossGroup).channel(NioServerSocketChannel.class)//printlog.handler(newLoggingHandler(LogLevel.INFO)).childHandler(newChannelInitializer(){@OverrideprotectedvoidinitChannel(Channelch)throwsException{ch.pipeline().addLast(newRpcServerHandler());}})//这个参数影响accept没有取出来的连接.option(ChannelOption.SO_BACKLOG,128)//这个参数只是对于客户端一段时间内如果客户端没有响应,服务器会发送一个ack包来判断客户端是否还活着。.childOption(ChannelOption.SO_KEEPALIVE,true);这里只需要一个实现类。RpcServerHandler服务器的序列化/反序列化调整为直接使用JsonBs。packagecom.github.houbb.rpc.server.handler;importcom.github.houbb.json.bs.JsonBs;importcom.github.houbb.log.integration.core.Log;importcom.github.houbb.log.integration.core。LogFactory;导入com.github.houbb.rpc.common.model.CalculateRequest;导入com.github.houbb.rpc.common.model.CalculateResponse;导入com.github.houbb.rpc.common.service.Calculator;导入com.github.houbb。rpc.server.service.CalculatorService;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;/***@authorbinbin.hou*@since0.0.1*/publicclassRpcServerHandlerextendsSimpleChannelInboundHandler{privatestaticfinalLoglog=LogFactory.getLog(RpcServerHandler.class);@OverridepublicvoidchannelActive(ChannelHandlerContextctx)throwsException{finalStringid=ctx.channel().id().asLongText();log.info([服务器]通道{}connected"+id);}@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,Objectmsg)throwsException{finalStringid=ctx.channel().id().asLongText();ByteBufbyteBuf=(ByteBuf)msg;byte[]bytes=newbyte[byteBuf.readableBytes()];byteBuf.readBytes(字节);CalculateRequestrequest=JsonBs.deserializeBytes(bytes,CalculateRequest.class);log.info([Server]receivechannel{}request:{}from",id,request);Calculatorcalculator=newCalculatorService();CalculateResponseresponse=calculator.sum(请求);//返回到客户端byte[]responseBytes=JsonBs.serializeBytes(response);ByteBufresponseBuffer=Unpooled.copiedBuffer(responseBytes);ctx.writeAndFlush(responseBuffer);log.info([Server]channel{}response{}",id,response);}}客户端核心客户端可以简单化如下:channelFuture=bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true).handler(newChannelInitializer(){@OverrideprotectedvoidinitChannel(Channelch)throwsException{channelHandler=newRpcClientHandler();ch.pipeline().addLast(newLoggingHandler(LogLevel.INFO)).addLast(channelHandler);}}).connect(RpcConstant.ADDRESS,port).syncUninterruptibly();RpcClientHandler客户端的序列化/反序列化调整为直接使用JsonBs实现packagecom.github.houbb.rpc.client.handler;importcom.github.houbb.json.bs.JsonBs;importcom.github.houbb.log.integration.core.Log;importcom.github.houbb.log.integration.core。LogFactory;importcom.github.houbb.rpc.client.core.RpcClient;importcom.github.houbb.rpc.common.model.CalculateResponse;importio.netty.buffer.ByteBuf;importio.netty.channel.ChannelHandlerContext;importio.netty。channel.SimpleChannelInboundHandler;/***
客户终端处理类
**
创建时间:2019/10/1611:30下午
*
项目:rpc
**@authorhoubinbin*@since0.0.2*/publicclassRpcClientHandlerextendsSimpleChannelInboundHandler{privatestaticfinalLoglog=LogFactory.getLog(RpcClient.class);/***响应信息*@since0.0.4*/privateCalculateResponseresponse;@OverrideprotectedvoidchannelRead0(Channel,andlerHandlerContextctx)ByteBuf)msg;byte[]bytes=newbyte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);this.response=JsonBs.deserializeBytes(bytes,CalculateResponse.class);log.info([Client]responseis:{}",response);}@OverridepublicvoidchannelReadComplete(ChannelHandlerContextctx)throwsException{//每次用完必须关闭,否则不会得到response,不知道为什么(需要通过目测了解netty)//个人理解:如果不关闭,会一直阻塞ctx.flush();ctx.close();}publicCalculateResponsegetResponse(){returnresponse;}}