简介上一节我们讲解了netty中的Channel,知道Channel是事件处理器与外部通信的桥梁。今天,本文将详细讲解netty剩下的几个非常重要的部分,Event,Handler和PipeLine。ChannelPipelinepipeLine是连接Channel和handler的桥梁。它实际上是filter的一个实现,用来控制handler的处理方式。创建通道时,也会创建其对应的ChannelPipeline。首先看ChannelPipeline的定义:publicinterfaceChannelPipelineextendsChannelInboundInvoker,ChannelOutboundInvoker,Iterable首先,ChannelPipeline继承自Iterable,说明它是可遍历的,遍历的结果是Handlers之一。ChannelPipeline作为一个合格的Iterable,提供了一系列的add和remote方法,通过这些方法可以在ChannelPipeline中添加或移除Handler。因为ChannelPipeline是一个过滤器,而过滤器需要指定对应过滤器的顺序,所以ChannelPipeline中有addFirst和addLast方法来添加不同的顺序。然后可以看到ChannelPipeline继承了两个接口ChannelInboundInvoker和ChannelOutboundInvoker。先看一张channelPipeline的工作流程图:可以看出ChannelPipeline主要有两个操作,一个是读入站,一个是写出出站。对于Socket.read()这样的读入操作,实际调用的是ChannelInboundInvoker中的方法。对于外部IO写请求,调用ChannelOutboundInvoker中的方法。注意inbound和outbound的处理顺序是相反的,比如下面这个例子:ChannelPipelinep=...;p.addLast("1",newInboundHandlerA());p.addLast("2",newInboundHandlerB());p.addLast("3",newOutboundHandlerA());p.addLast("4",newOutboundHandlerB());p.addLast("5",newInboundOutboundHandlerX());ChannelPipeline添加了5个处理程序,包括2个InboundHandlers、2个OutboundHandlers和一个同时处理In和Out的Handler。那么当通道遇到入站事件时,会按照1、2、3、4、5的顺序进行处理,但是只有InboundHandler能够处理入站事件,所以实际的执行顺序是1、2、5。同理,当channel遇到outbound事件时,会按照5,4,3,2,1的顺序执行,但是只有outboundHandler可以处理outbound事件,所以实际的执行顺序是5,4,3。简单的说put,ChannelPipeline指定了Handler的执行顺序。ChannelHandlernetty是一个事件驱动的框架,所有的事件都由Handler来处理。ChannelHandler可以处理IO,拦截IO,或者将事件传递给ChannelPipeline中的下一个Handler进行处理。ChannelHandler的结构很简单,只有三个方法,分别是:voidhandlerAdded(ChannelHandlerContextctx)throwsException;voidhandlerRemoved(ChannelHandlerContextctx)抛出异常;voidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsExceptionout;不同的是,ChannelHandler可以分为两类,即ChannelInboundHandler和ChannelOutboundHandler。因为这两个是接口,实现起来比较麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter和ChannelDuplexHandler。前两个很好理解,就是inbound和outbound,最后一个可以同时处理inbound和outbound。ChannelHandler由ChannelHandlerContext提供,与ChannelPipeline的交互也是通过ChannelHandlerContext进行的。ChannelHandlerContextChannelHandlerContext允许ChannelHandler与ChannelPipeline或其他Handler进行交互。它是使Handler和Channel能够交互的上下文。比如在ChannelHandlerContext中,调用channel()获取绑定的channel。绑定的Handler可以通过调用handler()获取。通过调用fire*方法触发通道事件。看一下ChannelHandlerContext的定义:publicinterfaceChannelHandlerContextextendsAttributeMap,ChannelInboundInvoker,ChannelOutboundInvoker可以看到是一个AttributeMap,用来存储属性,或者是一个ChannelInboundInvoker和ChannelOutboundInvoker,用来触发和传播相应的事件。对于Inbound来说传播事件的方法有:ChannelHandlerContext.fireChannelRegistered()ChannelHandlerContext.fireChannelActive()ChannelHandlerContext.fireChannelRead(Object)ChannelHandlerContext.fireChannelReadComplete()ChannelHandlerContext.fireExceptionCaught(Throwable)ChannelHandlerContext.fireUserEventTriggered(Object)ChannelHandlerContext.fireChannelWritabilityChanged()ChannelHandlerContext.fireChannelInactive()ChannelHandlerContext.fireChannelUnregistered()对于Outbound来说传播事件的方法有:ChannelHandlerContext.bind(SocketAddress,ChannelPromise)ChannelHandlerContext.connect(SocketAddress,SocketAddress,ChannelPromise)ChannelHandlerContext.write(Object,ChannelPromiseChannelContext)ChannelHandler.read()ChannelHandlerContext.disconnect(ChannelPromise)ChannelHandlerContext.close(ChannelPromise)ChannelHandlerContext.deregister(ChannelPromise)在一个Handler中调用这些方法,然后将事件传递给下一个Handler,如下:publicclassMyInboundHandlerextendsChannelInboundHandlerAdapter{}}publicclassMyOutboundHandlerextendsChannelOutboundHandlerAdapter{@Overridepublicvoidclose(ChannelHandlerContextPromisectx,SystemPromise).out.println("Closing..");ctx.close(承诺);ChannelHandler中的状态变量ChannelHandler是一个Handler类。一般来说,这个类的实例可以被多个通道共享。前提是这个ChannelHandler没有共享状态变量但是有时候,我们需要在ChannelHandler中维护一个状态,所以就涉及到ChannelHandler中的状态变量,看下面的例子:@OverridepublicvoidchannelRead0(ChannelHandlerContextctx,Messagemessage){if(messageinstanceofLoginMessage){authenticate((LoginMessage)message);登录=真;}else(messageinstanceofGetDataMessage){if(loggedIn)ctx.writeAndFlush(fetchSecret((GetDataMessage)message));}else{失败();}}}...}在这个例子中,我们需要在收到LoginMessage后对消息进行鉴权,并保存鉴权状态,因为业务逻辑是这样的,所以必须要有一个状态变量。那么这样一个带状态变量的Handler只能绑定一个channel。如果绑定了多个channel,可能会出现状态不一致的问题。将通道绑定到Handler实例非常简单,只需在initChannel方法中使用new关键字创建一个新对象即可。公共类DataServerInitializer扩展ChannelInitializer
