当前位置: 首页 > 后端技术 > Java

netty源码中线程模型探索

时间:2023-04-01 23:24:33 Java

注:本文基于NettyFinal.3.9.4。我们先来看看NioServerBossPool,也就是传说中的MainReactor。它有两个重要字段,即privatefinalBoss[]boss;私人最终执行人bossExecutor;Boss数组默认大小为1,即存放一个Boss实例。这是什么老板?FieldsprivatefinalExecutor执行器;protectedvolatileThreadthread;protectedvolatileSelector选择器;privatefinalQueuetaskQueue=newConcurrentLinkedQueue();这个Executor就是我们新创建的boss线程线程池。Selector是java原生的Selector,在Linux平台上是EpollSelectorIpml。结构大概是这样的。让我们看看mainReactor是如何初始化和运行的。让我们回到NioServerBossPool。它的构造函数将执行init方法protectedvoidinit(){if(initialized){thrownewIllegalStateException("initializedalready");}初始化=真;对于(inti=0;i0){readBytes+=ret;如果(!bb.hasRemaining()){休息;}}失败=假;}catch(ClosedChannelExceptione){//可能发生,不需要用户注意。}catch(Throwablet){fireExceptionCaught(channel,t);}if(readBytes>0){bb.flip();最终ChannelBuffer缓冲区=bufferFactory.getBuffer(readBytes);buffer.setBytes(0,bb);buffer.writerIndex(readBytes);//更新预测器。predictor.previousReceiveBufferSize(readBytes);//触发事件。fireMessageReceived(通道,缓冲区);}如果(ret<0||失败){k.取消();//如果没有这个,一些JDK实现会陷入无限循环。关闭(通道,成功的未来(通道));返回假;}returntrue;}关键在于//触发event.fireMessageReceived(channel,buffer);继续往下看publicstaticvoidfireMessageReceived(Channelchannel,Objectmessage,SocketAddressremoteAddress){channel.getPipeline().sendUpstream(newUpstreamMessageEvent(channel,message,remote;remote}Address))可以发现这是为了封装读取数据到UpstreamMessageEvent并将其传递到Pipeline。顾名思义,这个Pipeline就是一个由Handler组成的Handler链,像流水线一样流动。当然,在netty中,Handler被封装成ChannelHandlerContext的sendUpstream方法是区内第一个ChannelHandlerContext,传入UpstreamMessageEventvoidsendUpstream(DefaultChannelHandlerContextctx,ChannelEvente){try{((ChannelUpstreamHandler)ctx.getHandler()).handleUpstream(ctx,e);}catch(t){notifyHandlerException(e,t);}}再往下看handleUpstream方法,在SimpleChannelUpstre中amHandler中实现publicvoidhandleUpstream(ChannelHandlerContextctx,ChannelEvente)throwsException{if(einstanceofMessageEvent){messageReceived(ctx,(MessageEvent)e);}elseif(einstanceofWriteCompletionEvent){WriteCompletionEventevt=(WriteCompletionEvent)e;写完成(ctx,evt);}elseif(einstanceofChildChannelStateEvent){ChildChannelStateEventevt=(ChildChannelStateEvent)e;如果(evt.getChildChannel().isOpen()){childChannelOpen(ctx,evt);}else{childChannelClosed(ctx,evt);}}elseif(einstanceofChannelStateEvent){ChannelStateEventevt=(ChannelStateEvent)e;开关(evt.getState()){案例开放:如果(Boolean.TRUE.equals(evt.getValue())){channelOpen(ctx,evt);}else{channelClosed(ctx,evt);}休息;案件绑定:if(evt.getValue()!=null){channelBound(ctx,evt);}else{channelUnbound(ctx,evt);}休息;caseCONNECTED:if(evt.getValue()!=null){channelConnected(ctx,evt);}else{channelDisconnected(ctx,evt);}休息;caseINTEREST_OPS:channelInterestChanged(ctx,evt);休息;默认值:ctx.sendUpstream(e);}}elseif(einstanceofExceptionEvent){exceptionCaught(ctx,(ExceptionEvent)e);}else{ctx.sendUpstream(e);}}显然会调用我们的messageReceived方法,但是我们可以在我们的Handler中重写这个方法());}@Override公共voidchannelOpen(ChannelHandlerContextctx,ChannelStateEvente)throwsException{super.channelOpen(ctx,e);}@OverridepublicvoidchannelConnected(ChannelHandlerContextctx,ChannelStateEvente)throwsException{super.channelConnected(ctx,thene);}}服务端传入的数据最终会经过一系列的Handlers处理