本文转载自微信公众号《源码学徒》,作者皇甫傲傲鸣。转载本文请联系出处学徒公众号。经过前面章节的学习,我们已经能够掌握JDKNIO的开发方法。下面总结一下NIO开发的过程:创建一个服务器通道ServerSocketChannel创建一个选择器Selector将服务器通道注册到选择器中,注意我们感兴趣的事件serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);绑定服务管道的地址serverSocketChannel.bind(newInetSocketAddress(8989));开始事件选择,选择我们感兴趣的事件进行相应的操作!具体代码信息请参考第一章:复用模型章节,这里不再赘述!我们在第一章也分析了复用的概念。多路复用模型可以最大程度的压榨出一个线程的执行能力,一个线程执行所有的数据,包括新连接的访问??,数据的读取,计算和回写,但是假设我们的数据计算和如果比较慢,那么这个任务的执行必然会影响下一个新链接的访问!传统的NIO单线程模型。操作(数据库连接、图片、文件下载)等操作造成线程阻塞,然后读取事件处理。这时候单线程程序是无法处理下一个新链接的!我们优化线程模型,选择事件处理封装任务,提交到线程池!NIO多线程模型上面的数据结构可以解决因为计算任务耗时过长而阻塞新连接访问的问题。能再优化一下吗?我们是否可以创建多个事件选择器,每个事件选择器负责不同的Socket连接,像下面这样:NIO多线程优化模型使得我们可以每个Select选择器负责多个客户端Socket连接,主线程只需要选择一个客户端新连接的选择器并在select选择器上注册!于是我们的架构图就变成了下图:当我们在select选择器内部处理计算任务的时候,我们也可以把这个任务封装成一个任务,提交给线程池,把新连接的访问??和读写事件完全分离处理互不影响!其实这也是Netty的核心思想之一。我们可以根据上图来简化一下。写一个:代码实现构建一个上图select选择器对应的事件执行器/***Nioeventhandler**@authorhuangfu*@date*/publicclassMyNioEventLoopimplementsRunnable{staticfinalByteBufferALLOCATE=ByteBuffer.allocate(128);privatefinalSelectorselector;privatefinalLinkedBlockingQueue<可运行>linkedBlockingQueue;publicMyNioEventLoop(Selectorselector){this.selector=selector;linkedBlockingQueue=newLinkedBlockingQueue<>();}publicSelectorgetSelector(){returnselector;}publicLinkedBlockingQueuegetLinkedBlockingQueue(){returnlinkedBlockingQueue;}//忽略hashCode和eques/***任务处理器*/@Overridepublicvoidrun(){while(!Thread.currentThread().isInterrupted()){try{//事件选择这里只处理读事件if(selector.select()>0){SetselectionKeys=selector.selectedKeys();Iteratoriterator=selectionKeys.iterator();//处理读事件while(iterator.hasNext()){SelectionKeynext=iterator.next();iterator.remove();if(next.isReadable()){SocketChannelchannel=(SocketChannel)next.channel();intread=channel.read(分配);if(read>0){System.out.printf("线程%s[%s]发送了一条消息:",Thread.currentThread().getName(),channel.getRemoteAddress());System.out.println(newString(ALLOCATE.array(),StandardCharsets.UTF_8));}elseif(read==-1){System.out.println("连接断开");channel.close();}ALLOCATE.clear();}}selectionKeys.clear();}else{//处理注册的异步任务while(!linkedBlockingQueue.isEmpty()){Runnabletake=linkedBlockingQueue.take();//异步事件执行take.run();}}}catch(IOException|InterruptedExceptione){e.printStackTrace();}}}}构建选择器组/***选择器组**@authorhuangfu*@date2021年3月12日09:44:37*/publicclassSelectorGroup{privatefinalListSELECTOR_GROUP=newArrayList<>(8);privatestaticfinalintAVAILABLE_PROCESSORS=Runtime.getRuntime().availableProcessors();privatefinalegericIDX=;/***初始化选择器*@paramcount处理器数量*@throwsIOException异常*/publicSelectorGroup(intcount)throwsIOException{for(inti=0;iRUN_SELECT=newHashSet<>();}新建连接访问selector/***connector**@authorhuangfu*@date20213月12日10:15:37*/publicclassAcceptorimplementsRunnable{privatefinalServerSocketChannelserverSocketChannel;privatefinalSelectorGroupselectorGroup;publicAcceptor(ServerSocketChannelserverSocketChannel,SelectorGroupselectorGroup){this.serverSocketChannel=serverSocketChannel;this.selectorGroup=selectorGroup;}@Overridepublicvoidrun(){try{SocketChannelsocketLventioopNoccept(serverSocketChannel.accept);next();//追加一个注册任务到队列中next.getLinkedBlockingQueue().offer(()->{try{//客户端注册为非阻塞socketChannel.configureBlocking(false);//注册到选择器关注一个读事件socketChannel.register(next.getSelector(),SelectionKey.OP_READ);}catch(Exceptione){e.printStackTrace();}});//唤醒相应的任务,让它处理方便同步任务next.getSelector().wakeup();System.out.println("Connectiondetected:"+socketChannel.getRemoteAddress());//当当前selector已经使用过,就不再使用了,直接注册就行if(ThreadContext.RUN_SELECT.add(next)){//启动任务newThread(next).start();}}catch(IOExceptione){e.printStackTrace();}}}创建launcher/***@authorhuangfu*@date*/publicclassTestMain{publicstaticvoidmain(String[]args)throwsIOException{//创建一个选择器组并通过选择器组的大小来决定使用多少个选择器来实现SelectorGroupselectorGroup=newSelectorGroup(2);//打开一个server-sidepipelineServerSocketChannelserverSocketChannel=ServerSocketChannel.open();//打开一个服务器特定的选择器Selectorselector=Selector.open();//设置非阻塞serverSocketChannel.configureBlocking(false);//创建一个连接器Acceptoracceptor=newAcceptor(serverSocketChannel,selectorGroup);//将服务器通道注册到服务器选择器此处将绑定一个新的连接访问器serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,acceptor);//绑定端口serverSocketChannel.bind(newInetSocketAddress(8989));//启动处理器newReactor(selector).run();}}总结单线程下的NIO存在性能瓶颈。当某个计算过程缓慢时,整个线程就会被阻塞,从而影响其他事件的处理!为了解决这个缺陷,我们提出使用异步线程来操作任务,将耗时较长的业务封装成一个异步任务,提交给线程池执行!为了将业务操作与新连接访问完全分离,我们又做了一次优化。我们封装了一个选择器组,通过轮询的方式获取选择器。每个选择器可以处理多个新连接。socketconnection->selectorselector=multiple->1,在每个selector里面可以使用线程池处理任务进一步提高吞吐量!