本文转载自微信公众号《KK架构师》,作者KK架构师。转载请联系KKarchitects公众号。1、一开始阅读分布式开源项目时,最重要的是了解这个项目的通信框架。由于一个分布式开源框架通常部署在一个集群中,不同的节点需要相互通信来完成复杂的功能,而在阅读这些源代码的时候,如果不了解它的通信机制,就会迷失在其中。代码,就像走进了一片原始森林。比如HDFS使用自己封装的HadoopRpc作为通信框架;Netty用于Spark的底层通信;而我最近看的Kafka源码,底层使用的是原生的JavaNIO。那么这次,就来聊一聊JavaNIO的主要知识点。2.多图理解NIO的三个核心概念说到NIO,就会有三个核心概念:channel,buffer,selector。开门见山,可能听起来有点失落,需要从头说起。1、以往,当通道的并发要求不是很高时,CPU全权处理输入输出(中断),如下图:用户程序向通道发起读写请求服务器,CPU直接处理这些请求。这有一个缺点。当IO请求很多的时候,会占用大量的CPU,降低整个系统的处理能力。随着计算机的发展,出现了一种新的方式,使用DMA全权处理IO请求,如下图所示:DMA即DirectMemoryAccess,直接内存访问控制。为什么要添加这个设备?因为CPU的中断方式不能满足数据传输速度的要求,因为在中断方式下,每次中断都需要保存断点和场景,而当中断返回时,必须恢复断点和场景。综上所述,中断方式很难满足高速外设对传输速度的要求。于是就有了DMA这样的设备。在DMA方式的数据传输过程中,当I/O设备需要传输数据时,DMA控制器向CPU发送DMA传输请求,CPU响应后会放弃系统总线。DMA控制器接管总线进行数据传输,此时CPU除了一些初始化操作外,可以做自己的事情。但是用DMA还是不能满足业务快速发展的需要,因为当I/O请求过多的时候,就会出现总线冲突。所以后来才出现了频道(Channel)。它与DMA的区别在于通道有自己的指令系统和程序,是一个协处理器;而DMA只能实现固定的数据传输控制。JavaNIO中的Channel就是上图中通道的实现。2.buffer理解channel的概念,buffer也很好理解。通道表示与I/O设备(例如,文件、套接字)的打开连接,但通道本身不存储数据。真正的数据传输载体是缓冲区。当应用程序要写入数据时,需要先将数据写入缓冲区,然后通道负责将缓冲区中的数据发送到目的地(文件、磁盘、网络),然后从中取数据缓冲区。如果需要使用NIO系统,需要获取用于连接I/O设备的通道和用于保存数据的缓冲区,然后操作缓冲区来处理数据。3.选择器选择器也称为多路复用器,是一种非阻塞I/O。既然说了非阻塞,就要先说说阻塞。阻塞式如下图所示:当客户端向服务器发送读写请求时,服务器上的线程会一直检查内核地址空间是否有数据。当客户端没有数据发送时,服务器上的线程会一直等待下去,这期间什么也做不了。直到客户端发送数据,它会将数据从内核地址空间复制到用户地址空间,然后读取数据。这样一来,如果有大量的请求,后面的请求将不得不等待前面的请求执行,从而造成大量的排队,无法充分利用cpu资源,性能会急剧下降。让我们来看看选择器是如何工作的。现在client和server之间的通信使用的是channels+buffers,那么所有的channel都会注册到selector中。选择器监视这些通道的I/O状态,例如连接、读取和写入。当某个通道上的事件完全准备好时,选择器会将此任务分配给服务器上的一个或多个线程。当客户端没有事件准备好时,服务器线程不会阻塞,它可以做自己的事情,直到客户端事件准备好后才会处理。相对于阻塞类型,这种非阻塞类型可以进一步利用cpu资源。3.理解了概念之后,我们来学习API1。BufferAPI要想彻底了解缓冲区,就必须知道缓冲区的四个属性,mark、position、limit、capacity。你只需要运行一次代码就知道了。(1)分配一定大小的缓冲区//1。分配指定大小的缓冲区ByteBufferbuffer=ByteBuffer.allocate(10);System.out.println("--------alocate");System.out.println("position:"+buffer.position());System.out.println("limit:"+buffer.limit());System.out.println("capacity:"+buffer.capacity());运行结果:--------alocate------------position:0limit:10capacity:10这里我们分配了一个10字节的buffer,也就是在ByteBuffer中最后的byte[]hb;attribute已经开辟了10个字节的空间。所以capacity为10,limit为可读写数据的最大位置也是10,position为0为可操作数据的位置。(2)向缓冲区写入数据//2.向缓冲区写入数据Stringstr="abcde";System.out.println("------------put------------");buffer.put(str.getBytes(StandardCharsets.UTF_8));System.out.println("position:"+buffer.position());System.out.println("limit:"+buffer.limit());System.out.println("容量:"+buffer.capacity());运行结果:----------put-------------position:5limit:10capacity:10这里我们向buffer写入5个字节的数据,然后是capacity和limit还是10,但是position是5,因为之前已经写了5(3)切换到读取数据的模式//3。切换到读取数据模式buffer.flip();System.out.println("------------flip------------");System.out.println("位置:"+buffer.position());System.out.println("限制:"+buffer.limit());System.out.println("容量:"+buffer.capacity());那么我们现在要从缓冲区中读取一些数据,我们需要切换到翻转模式,翻转会改变一些属性的值。运行结果:-----------flip------------position:0limit:5capacity:10flip将position的值改为0,limit为5,即意思是我要从头读,最多只能读5个位置(4)readsomedata//4.读取数据System.out.println("-------------get------------");byte[]dest=newbyte[buffer.limit()];buffer.get(dest);System.out。println(newString(dest,0,dest.length));System.out.println("position:"+buffer.position());System.out.println("limit:"+buffer.limit());System.out.println("capacity:"+buffer.capacity());运行结果:------------get------------abcdeposition:5limit:5capacity:10读取数据后,位置变成5,说明我已经读取到5(5)重复读取//5.rewind()buffer.rewind();System.out.println("------------rewind------------");System.out.println("位置:"+buffer.position());System.out.println("限制:"+buffer.limit());System.out.println("容量:"+buffer.capacity());运行结果:------------rewind------------position:0limit:5capacity:10rewind表示重复读取其中的数据buffer(6)清除数据//6.clear()buffer.clear();System.out.println("------------clear-------------");System.out.println("位置:"+buffer.position());System.out.println("限制:"+buffer.limit());System.out.println("容量:"+buffer.capacity());运行结果:------------clear------------position:0limit:10capacity:10clear()后,position归0,limit归零10,又可以开始写数据了,可以写10个字节。但需要注意的是,缓冲区中的数据并没有被清空,数据还处于“被遗忘”的状态。这些指针已经恢复到原来的状态。(7)Mark这是第四个属性:mark。mark可以记录position的位置。可以通过reset()方法回到标记的位置。@Testpublicvoidtest2(){//分配10字节Stringstr="abcde";ByteBufferbuffer=ByteBuffer.allocate(10);buffer.put(str.getBytes(StandardCharsets.UTF_8));//切换到读取模式,读取2字节缓冲区.flip();byte[]dest=newbyte[buffer.limit()];buffer.get(dest,0,2);System.out.println(newString(dest,0,2));System.out。println(buffer.position());//标记记录当前位置buffer.mark();//读取两个字节buffer.get(dest,2,2);System.out.println(newString(dest,2,2));System.out.println(buffer.position());//复位,返回标记位置buffer.reset();System.out.println(buffer.position());}执行结果:```texab2cd422,使用channel,buffer,selector完成一个网络程序(一)server@TestpublicvoidtestServer()throwsIOException{ServerSocketChannelserverSocketChannel=ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(newInetSocketAddress(8989));Selectorselector=Selector.open();serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);while(selector.select()>0){Iterator
