当前位置: 首页 > 后端技术 > Java

Netty系列:加油中国

时间:2023-04-01 18:01:12 Java

简介在之前的系列文章中,我们了解了netty的基本结构和工作原理。你一定抑制不住心中的喜悦。想要开始手写代码来体验一下这个神奇的netty框架,最近东京奥运会,我们写一个netty客户端和服务器为中国加油怎么样?场景规划那么我们今天要构建什么样的系统呢?首先要搭建一个server服务器来处理所有netty客户端的连接,处理客户端发给服务器的消息。还需要搭建一个client,负责与server服务器建立连接,向server服务器发送消息。在今天的例子中,客户端建立连接后,首先会向服务器发送一个“China”消息,然后服务器返回一个“Comeon!”。收到消息后给客户端发消息,然后客户端收到消息再给服务端发送一条“中国”的消息....从此如此循环往复,直到奥运结束!我们知道客户端和服务端都是通过handler来处理消息的。在handler中,我们可以重写channelRead方法,这样在读取到channel中的消息后,就可以对消息进行处理,然后在客户端的Bootstrap中启动client端和server端的handler配置,是不是很简单的?我们一起做吧。启动服务器假设服务器端的处理程序称为CheerUpServerHandler。我们使用ServerBootstrap构建两个EventLoopGroups来启动服务器。看过本系列第一篇文章的朋友可能知道,服务端需要启动两个EventLoopGroup,一个bossGroup,一个workerGroup。两组是父子关系。bossGroup负责处理连接相关的问题,workerGroup负责处理通道中的具体消息。启动服务的代码相同,如下://服务器配置//bossloopEventLoopGroupbossGroup=newNioEventLoopGroup(1);//工作循环EventLoopGroupworkerGroup=newNioEventLoopGroup();finalCheerUpServerHandlerserverHandler=newCheerUpServerHandler();尝试{ServerBootstrapb=newServerBootstrap();b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)//tcp/ip协议监听函数中的backlog参数,等待连接的大小pool.option(ChannelOption.SO_BACKLOG,100)//Loghandler.handler(newLoggingHandler(LogLevel.INFO)).childHandler(newChannelInitializer(){@Override//初始化通道,添加处理程序publicvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinep=ch.pipeline();//日志处理器p.addLast(newLoggingHandler(LogLevel.INFO));p.addLast(serverHandler);}});//启动服务器ChannelFuturef=b.bind(PORT).sync();//等待通道关闭f.channel().closeFuture()。同步();不同的服务,启动服务器的代码基本相同,这里需要注意这几点在ServerBootstrap中,我们添加了一个选项:ChannelOption.SO_BACKLOG,ChannelOption.SO_BACKLOG对应tcp/ip协议listen中的backlog参数(intsocketfd,intbacklog)函数用于初始化服务器端可连接队列,backlog参数指定队列的大小。因为对于一个连接来说,客户端连接请求的处理是顺序处理的,所以同时只能处理一个客户端连接。当有多个客户端过来时,服务器会将无法处理的客户端连接请求放入队列中。等待处理,另外,我们还添加了两个LoggingHandler,一个给handler,一个给childHandler。LoggingHandler主要是监听通道中的各种事件,然后输出相应的消息,非常好用。例如服务器启动时会输出如下日志:[nioEventLoopGroup-2-1]INFOi.n.handler.logging.LoggingHandler-[id:0xd9b41ea4]REGISTERED[nioEventLoopGroup-2-1]INFOi.n.handler.logging.LoggingHandler-[id:0xd9b41ea4]BIND:0.0.0.0/0.0.0.0:8007[nioEventLoopGroup-2-1]INFOin.n.handler.logging.LoggingHandler-[id:0xd9b41ea4,L:/0:0:0:0:0:0:0:0:8007]ACTIVE日志由第一个LoggingHandler输出,分别代表服务器端的REGISTERED、BIND和ACTIVE事件。从输出中我们可以看到服务器本身绑定到0.0.0.0:8007。当客户端开始与服务器建立连接时,会输出如下日志:[nioEventLoopGroup-2-1]INFOi.n.handler.logging.LoggingHandler-[id:0x37a4ba9f,L:/0:0:0:0:0:0:0:0:8007]阅读:[id:0x6dcbae9c,L:/127.0.0.1:8007-R:/127.0.0.1:54566][nioEventLoopGroup-2-1]INFOi.n.handler.logging.LoggingHandler-[id:0x37a4ba9f,L:/0:0:0:0:0:0:0:0:8007]READCOMPLETE上面的日志表示READ和READCOMPLETE事件,其中L:/127.0.0.1:8007-R:/127.0.0.1:54566表示本地服务器的8007端口连接到客户端的54566端口。对于第二个LoggingHandler,会输出一些与消息处理相关的具体信息。比如REGISTERED、ACTIVE、READ、WRITE、FLUSH、READCOMPLETE等事件,这里就不一一列举了。启动客户端也是如此,假设客户端的handler名称为ChinaClientHandler,那么就可以像启动服务端一样启动客户端,如下://client'seventLoopEventLoopGroupgroup=newNioEventLoopGroup();试试{Bootstrapb=newBootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(newChannelInitializer(){@OverridepublicvoidinitChannel(SocketChannelch)throwsException{lineChannelPipelinep=ch}();//添加日志处理器p.addLast(newLoggingHandler(LogLevel.INFO));p.addLast(newChinaClientHandler());}});//启动客户端ChannelFuturef=b.connect(HOST,PORT).sync();客户端开始使用Bootstrap,我们也为其配置了一个LoggingHandler,并添加了一个自定义的ChinaClientHandler。消息处理我们知道handler有两种,一种是inboundHandler,一种是outboundHandler。这里我们要监听从socket中读取数据的事件,所以这里的client和server的handler可以继承自ChannelInboundHandlerAdapter。消息处理的过程是客户端与服务器建立连接后,首先会向服务器发送一条“中国”的消息。客户端和服务端建立连接后,会触发channelActive事件,所以可以在客户端的handler中发送消息:publicvoidchannelActive(ChannelHandlerContextctx){ctx.writeAndFlush("China");}服务器正在从通道中读取channelRead事件将在获取消息时触发,因此服务器端处理程序可以覆盖channelRead方法:publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg){log.info("Receivedmessage:{}",味精);ctx.writeAndFlush("来吧!");然后客户读到“来吧!”从通道,然后将“中国”写入通道,所以客户端也需要重写方法channelRead:这可以循环进行吗?消息处理中的陷阱其实,当你执行上面的代码时,你会发现客户端确实向通道写入了“China”消息,但是并没有触发服务端的channelRead。为什么?据研究,如果写入的对象是String,程序内部会出现这样的错误,但是这个错误是隐藏的,在运行程序的输出中是看不到的,所以对新手还是很不友好的的朋友。这个错误是:DefaultChannelPromise@57f5c075(failure:java.lang.UnsupportedOperationException:unsupportedmessagetype:String(expected:ByteBuf,FileRegion))从错误信息可以看出,目前支持的消息类型有两种,分别是ByteBuf和FileRegion.好吧,我们把上面的消息类型改成ByteBuf试试:message=Unpooled.buffer(ChinaClient.SIZE);message.writeBytes("中国".getBytes(StandardCharsets.UTF_8));publicvoidchannelActive(ChannelHandlerContextctx){log.info("可读字节数:{},index:{}",message.readableBytes(),message.readerIndex());log.info("可写字节数:{},index:{}",message.writableBytes(),message.writerIndex());ctx.writeAndFlush(消息);}上面我们定义了一个ByteBuf全局消息对象发送给服务端,然后在服务端读取消息,再发送一个ByteBuf全局消息对象给客户端,以此类推。但是当你运行上面的程序时,你会发现服务端确实收到了“China”,客户端也确实收到了“Comeon!”,但是客户端后续发送的“China”报文并没有被客户端接收到。服务器。你怎么回复?关于什么?我们知道ByteBuf有readableBytes、readerIndex、writableBytes、writerIndex、capacity、refCnt等属性。我们在消息发送前后对比这些属性:消息发送前:可读字节数:6,readerIndex:0可写字数Section:14,writerIndex:6capacity:20,refCnt:1消息发送后:可读bytes:6,readerIndex:0writablebytes:-6,writerIndex:6capacity:0,refCnt:0所以问题找到了,由于ByteBuf已经处理过一次,refCnt变成了0,所以不能再继续写了,怎么办解决它?简单的方法就是每次发送都更新一个ByteBuf,这样是没有问题的。但是每次都新建一个对象好像有点浪费空间,怎么办?既然refCnt已经变成了0,那我们是不是应该调用ByteBuf中的retain()方法来增加refCnt呢?答案是这样的,但是需要注意的是在发送前需要调用retain()方法。如果消息处理完后调用retain()方法,会报异常。综上所述,运行上面的程序,你可以一直为中国加油,YYDS!本文示例可参考:learn-netty4本文已收录于http://www.flydean.com/06-netty-cheerup-china/最流行的解读,最深刻的干货,最简洁的教程,很多你不知道的小技巧等你来发现!欢迎关注我的公众号:《程序那些事儿》,懂技术,更懂你!