当前位置: 首页 > 后端技术 > Java

Netty系列:基于流的数据传输

时间:2023-04-01 16:33:42 Java

简介我们知道数据的传输方式有两种,分别是字符流和字节流。字符流是指传输的对象是一个字符串,并且已经设置了格式,发送方和接收方可以按照特定的格式进行读取,字节流是指以最原始的二进制字节来传输数据。今天给大家介绍一下netty中基于流的数据传输。package和byte熟悉TCP/IP协议的同学应该都知道,在TCP/IP中,由于底层协议有一个最大支持的数据包,对于大数据传输,需要对数据进行拆分打包,而这些拆解并发送组装好的数据包,最后在接收端组装这些数据包。每个数据包中都有一个固定的结构,因此接收方可以清楚地知道应该将多少个数据包组合为最终结果。所以对于netty来说,ByteBuf是在channel中传输的,其实最底层就是byte数组。对于这个字节数组,接收方并不知道要合并多少字节才能合成原始消息,所以需要在接收方对接收到的字节进行合并,生成最终的数据。那么netty中的字节数据流应该如何组合呢?接下来我们看看两种组合方法。手动组合这种组合的基本思路是构造一个目标大小的ByteBuf,然后通过调用ByteBuf的writeBytes方法将接收到的字节写入到ByteBuf中。最后从ByteBuf中读取相应的数据。例如,我们要从服务器向客户端发送一个int数字。一般来说,一个int是32bits,一个byte是8bits,所以一个int需要4个字节。在服务器端,可以创建一个字节数组,其中包含4个元素。将4个元素的字节发送给客户端,那么客户端应该如何处理呢?首先我们需要创建一个clientHander,这个handler应该继承ChannelInboundHandlerAdapter,并在它的handler加入ChannelPipeline时初始化一个包含4个字节的byteBuf。handler添加时会触发handlerAdded事件,所以我们可以这样写:privateByteBufbuf;@OverridepublicvoidhandlerAdded(ChannelHandlerContextctx){//创建一个4字节的缓冲区buf=ctx.alloc().buffer(4);在上面的例子中,我们从ctx中分配了一个4字节的缓冲区,并将其赋值给handler中的私有变量buf。当处理程序被执行并从ChannelPipeline中删除时,将触发handlerRemoved事件。在这种情况下,我们可以清理分配的Bytebuf。一般来说,我们可以调用它的release方法,如下所示:publicvoidhandlerRemoved(ChannelHandlerContextctx){buf.release();//释放bufbuf=null;}然后最关键的一步就是从channel中读取byte,放入4字节的byteBuf中。在上一篇文章中,我们提到了可以在channelRead方法中处理消息读取的逻辑。publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg){ByteBufm=(ByteBuf)msg??;buf.writeBytes(m);//写一个字节m.release();if(buf.readableBytes()>=4){//已经组成了4个字节,4个字节的组合称为一个intlongresult=buf.readUnsignedInt();ctx.close();}}每次触发channelRead方法,读取的字节会通过调用writeBytes方法写入到buf中。当buf的可读字节大于等于4时,说明这4个字节已经读完,可以操作了。这里我们把4个字节组合成一个unsignedInt,用readUnsignedInt方法从buf中读取出来,这个组合称为一个int数。上面的例子虽然可以解决4个字节的字节问题,但是如果数据结构比较负责的话,上面的方法就不能如愿以偿了,需要考虑太多的数据组合问题。接下来我们看另一种方式。Byte的转换类netty提供了一个ByteToMessageDecoder转换类,可以方便的将Byte转换成其他类型。我们只需要重做里面的decode方法就可以实现ByteBuf的转换:readBytes(in.readableBytes()));上面的示例将字节从输入转换为输出。当然,你也可以在上面的方法中进行格式转换,如下所示:<4){返回;}out.add(in.readBytes(4));}}上面的例子会先判断in里面是否有4个字节,如果有就读出来放到out里。那么有同学会问了,输入的不是一个字节一个字节的吗?为什么这里一次可以读取4个字节呢?这是因为ByteToMessageDecoder内置了一个缓存设备,所以这里的in其实是一个缓存集合。ReplayingDecodernetty还提供了一个更简单的转换ReplayingDecoder。如果你使用ReplayingDecoder重放上面的逻辑,它看起来像这样:(4));}}只有一行代码。ReplayingDecoder其实是ByteToMessageDecoder的子类,是在ByteToMessageDecoder上丰富了一些功能的结果。两者的区别是ByteToMessageDecoder还需要调用readableBytes判断是否有足够的字节读取,直接使用ReplayingDecoder读取。它假定所有字节都已被成功接受。例如,以下代码使用ByteToMessageDecoder:publicclassIntegerHeaderFrameDecoderextendsByteToMessageDecoder{@Overrideprotectedvoiddecode(ChannelHandlerContextctx,ByteBufbuf,Listout)throwsException{if(buf.readableBytes()<4){}return。markReaderIndex();intlength=buf.readInt();if(buf.readableBytes()