当前位置: 首页 > 后端技术 > Java

Netty系列:kequeue传输协议详解

时间:2023-04-02 01:51:59 Java

简介在前面的章节中,我们介绍了在netty中可以使用kequeue或者epoll来实现更高效的原生传输方式。那么kequeue和epoll以及NIO传输协议有什么区别呢?本章将以kequeue为例进行深入探讨。在我们上面介绍的native例子中,有几个关于kqueue的类,分别是KQueueEventLoopGroup、KQueueServerSocketChannel和KQueueSocketChannel。通过简单的替换和添加相应的依赖包,我们可以很方便的将普通的NIOnetty服务替换成NativeKqueue服务。是时候揭开Kqueue的秘密了。KQueueEventLoopGroupeventLoop和eventLoopGroup用于接受事件和事件处理。我们先看一下KQueueEventLoopGroup的定义:publicfinalclassKQueueEventLoopGroupextendsMultithreadEventLoopGroup作为一个MultithreadEventLoopGroup,必须实现一个newChild方法来创建子EventLoop。在KQueueEventLoopGroup中,除了构造函数之外,需要实现的额外方法是newChild:protectedEventLoopnewChild(Executorexecutor,Object...args)throwsException{IntegermaxEvents=(Integer)args[0];SelectStrategyFactoryselectStrategyFactory=(SelectStrategyFactory)args[1];RejectedExecutionHandlerrejectedExecutionHandler=(RejectedExecutionHandler)args[2];EventLoopTaskQueueFactorytaskQueueFactory=null;EventLoopTaskQueueFactorytailTask??QueueFactory=null;intargsLength=args.length;if(argsLength>3){taskQueueFactory=(EventLoopTaskQueueFactory)args[3];}if(argsLength>4){tailTask??QueueFactory=(EventLoopTaskQueueFactory)args[4];}returnnewKQueueEventLoop(this,executor,maxEvents,selectStrategyFactory.newSelectStrategy(),rejectedExecutionHandler,QueueFactory中的所有QueueFactory;参数都是从KQueueEventLoopGroup的构造函数传入的。除了maxEvents、selectStrategyFactory和rejectedExecutionHandler之外,还可以接收两个参数taskQueueFactory和tailTask??QueueFactory,最后将这些参数传递给KQueueEventLoop的构造函数,最后返回一个KQueueEventLoop对象。另外,在使用KQueueEventLoopGroup之前,我们需要保证系统中有Kqueue可用。这个判断是通过调用KQueue.ensureAvailability();来实现的。KQueue.ensureAvailability首先判断是否定义了系统属性io.netty.transport.noNative。如果确定了,说明nativetransport被disabled了,后面就不用再判断了。如果未定义io.netty.transport.noNative,将调用Native.newKQueue()以尝试从native获取kqueueFileDescriptor。如果上述获取过程没有异常,说明native方法中存在kqueue。我们可以继续使用它。下面是判断kqueue是否可用的代码:static{Throwablecause=null;if(SystemPropertyUtil.getBoolean("io.netty.transport.noNative",false)){cause=newUnsupportedOperationException("Nativetransportwasexplicitlydisabledwith-Dio.netty.transport.noNative=true");}else{文件描述符kqueueFd=null;尝试{kqueueFd=Native.newKQueue();}catch(Throwablet){cause=t;}finally{if(kqueueFd!=null){try{kqueueFd.close();}catch(Exceptionignore){//忽略}}}}UNAVAILABILITY_CAUSE=cause;}KQueueEventLoopKQueueEventLoop是从KQueueEventLoopGroup创建的,用于执行特定的IO任务。我们先看一下KQueueEventLoop的定义:finalclassKQueueEventLoopextendsSingleThreadEventLoop不管是NIO,KQueue还是Epoll,因为他们使用了更高级的IO技术,他们使用的EventLoop是SingleThreadEventLoop,也就是说单线程就够了。和KQueueEventLoopGroup一样,KQueueEventLoop也需要判断当前系统环境是否支持kqueue:static{KQueue.ensureAvailability();}上一节提到,KQueueEventLoopGroup会调用KQueueEventLoop的构造函数返回一个eventLoop对象。Let'stakealookatKQueueEventLoopfirst的构造函数:KQueueEventLoop(EventLoopGroupparent,Executorexecutor,intmaxEvents,SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler,EventLoopTaskQueueFactorytaskQueueFactory,EventLoopTaskQueueFactorytailTask??QueueFactory){super(parent,executor,false,newTaskQueue(taskQueueFactory),newTaskQueue(tailTask??QueueFactory),rejectedExecutionHandler);这个.selectStrategy=ObjectUtil.checkNotNull(strategy,"strategy");this.kqueueFd=Native.newKQueue();如果(maxEvents==0){allowGrowing=true;最大事件=4096;}else{allowGrowing=false;}这。changeList=新的KQueueEvent阵列(最大事件);this.eventList=newKQueueEventArray(maxEvents);intresult=Native.keventAddUserEvent(kqueueFd.intValue(),KQUEUE_WAKE_UP_IDENT);如果(结果<0){清理();thrownewIllegalStateException("kevent添加用户事件失败,errno:"+(-result));}}传入的maxEvents表示这个KQueueEventLoop可以接受的最大事件数。如果maxEvents=0,表示KQueueEventLoop的事件容量可以动态扩展,最大值为4096。否则,KQueueEventLoop的事件容量不能扩展。maxEvents用作数组的大小来构建changeList和eventList。KQueueEventLoop还定义了一个叫channels的map,用来保存注册的通道:privatefinalIntObjectMapchannels=newIntObjectHashMap(4096);查看通道添加和远程方法:voidadd(AbstractKQueueChannelch){assertinEventLoop();AbstractKQueueChannelold=channels.put(ch.fd().intValue(),ch);断言旧==null||!old.isOpen();}voidremove(AbstractKQueueChannelch)throwsException{assertinEventLoop();intfd=ch.fd().intValue();AbstractKQueueChannelold=channels.remove(fd);if(old!=null&&old!=ch){channels.put(fd,old);断言!ch.isOpen();}elseif(ch.isOpen()){ch.unregisterFilters();可以看到增删改查都是AbstractKQueueChannel。我们将在后面的章节中详细讲解KQueueChannel。这里我们只需要知道channelmap中的key是kequeue中唯一的FileDescriptor的int值即可。再来看一下EventLoop中最重要的运行方法:protectedvoidrun(){for(;;){try{intstrategy=selectStrategy.calculateStrategy(selectNowSupplier,hasTasks());开关(策略){caseSelectStrategy.CONTINUE:继续;案例SelectStrategy.BUSY_WAIT:案例SelectStrategy.SELECT:strategy=kqueueWait(WAKEN_UP_UPDATER.getAndSet(this,0)==1);如果(唤醒==1){唤醒();}默认值:}finalintioRatio=this.ioRatio;如果(ioRatio==100){尝试{如果(策略>0){processReady(策略);}}最后{runAllTask??s();}}else{finallongioStartTime=System.nanoTime();try{if(strategy>0){processReady(strategy);}}finally{finallongioTime=System.nanoTime()-ioStartTime;runAllTask??s(ioTime*(100-ioRatio)/ioRatio);其逻辑是先使用selectStrategy.calculateStrategy获取当前select策略,然后根据策略的值判断是否需要执行processReady方法,最后执行runAllTask??s从任务队列中获取需要执行的任务执行selectStrategy.calculateStrategy判断当前select状态。默认情况下,有三种状态:SELECT、CONTINUE、BUSY_WAIT。这三种状态都是负数:intSELECT=-1;int继续=-2;intBUSY_WAIT=-3;分别表示当前IO处于selectblock状态,或skip当前IO状态,为IOlooppull状态。BUSY_WAIT是非阻塞IOPULL,kqueue不支持,所以会回退到SELECT。除了这三种状态,calculateStrategy还会返回一个正值,表示当前要执行的任务数。在run方法中,如果策略的结果是SELECT,会调用Native.keventWait方法返回当前就绪事件的个数,并将就绪事件放入KQueueEventArray的eventList中。如果就绪事件数大于零,则调用processReady方法对这些事件进行状态回调处理。你是怎么处理的?下面是处理的核心逻辑:AbstractKQueueChannelchannel=channels.get(fd);AbstractKQueueUnsafe不安全=(AbstractKQueueUnsafe)channel.unsafe();if(filter==Native.EVFILT_WRITE){unsafe.writeReady();}elseif(filter==Native.EVFILT_READ){unsafe.readReady(eventList.data(i));}elseif(filter==Native.EVFILT_SOCK&&(eventList.fflags(i)&Native.NOTE_RDHUP)!=0){unsafe.readEOF();}这里的fd是从eventList中读取的:finalintfd=eventList.fd(i);根据eventList的fd,我们可以从channels中获取对应的KQueueChannel,然后根据事件的过滤状态判断KQueueChannel的具体操作,是writeReady,readReady还是readEOF。最后是执行runAllTask??s方法。runAllTask??s的逻辑很简单,就是从taskQueue中读取任务并执行。KQueueServerSocketChannel和KQueueSocketChannelKQueueServerSocketChannel是用在server端的channel:publicfinalclassKQueueServerSocketChannelextendsAbstractKQueueServerChannelimplementsServerSocketChannel{KQueueServerSocketChannel继承自AbstractKQueueServerChannel,除了构造函数之外,最重要的一个方法就是newChildChannel:@OverrideprotectedChannelnewChildChannel(intfd,byte[]address,intoffset,intlen)throwsException{returnnewKQueueSocketChannel(this,newBsdSocket(fd),address(address,offset,len));}Thismethodisusedtocreateanewchildchannel.Fromtheabovecode,wecanseethatthegeneratedchildchannelisaninstanceofKQueueSocketChannel.Itsconstructoracceptsthreeparameters,namelyparentchannel,BsdSocketandInetSocketAddress.KQueueSocketChannel(Channelparent,BsdSocketfd,InetSocketAddressremoteAddress){super(parent,fd,remoteAddress);config=newKQueueSocketChannelConfig(this);}HerefdistheresultofsocketacceptacceptedAddress:intacceptFd=socket(accept).accept(Address);下面是KQueueSocketChannel的定义:publicfinalclassKQueueSocketChannelextendsAbstractKQueueStreamChannelimplementsSocketChannel{KQueueSocketChannel和KQueueServerSocketChannel的关系是父子的关系,在KQueueSocketChannel中有一个parent方法,用来返回ServerSocketChannel对象,这也是前面提到的newChildChannel方法中传入TheserverChannelintheKQueueSocketChannelconstructor:publicServerSocketChannelparent(){return(ServerSocketChannel)super.parent();}AnotherfeatureofKQueueSocketChannelisthatitsupportstcpfastopen.ItsessenceistocalltheconnectxmethodofBsdSocketandpassitwhileestablishingtheconnectionData:intbytesSent=socket.connectx((InetSocketAddress)localAddress,(InetSocketAddress)remoteAddress,iov,true);Insummary,theaboveisthedetailedintroductionofKqueueEventLoopandKqueueSocketChannel,whichisbasicallynotmuchdifferentfromNIO不同之处在于性能非常出色。更多内容请参考http://www.flydean.com/53-1-netty-kqueue-transport/最通俗的解读,最深刻的干货,最简洁的教程,很多你不知道的你会的小技巧等你来发现!欢迎关注我的公众号:《程序那些事儿》,懂技术,更懂你!