当前位置: 首页 > 后端技术 > Java

Netty系列:Event、Handler和Pipeline

时间:2023-04-02 10:11:17 Java

简介上一节我们讲解了netty中的Channel,知道Channel是事件处理器与外部通信的桥梁。今天,本文将详细讲解netty剩下的几个非常重要的部分,Event,Handler和PipeLine。ChannelPipelinepipeLine是连接Channel和handler的桥梁。它实际上是filter的一个实现,用来控制handler的处理方式。创建通道时,也会创建其对应的ChannelPipeline。首先看ChannelPipeline的定义:publicinterfaceChannelPipelineextendsChannelInboundInvoker,ChannelOutboundInvoker,Iterable首先,ChannelPipeline继承自Iterable,说明它是可遍历的,遍历的结果是Handlers之一。ChannelPipeline作为一个合格的Iterable,提供了一系列的add和remote方法,通过这些方法可以在ChannelPipeline中添加或移除Handler。因为ChannelPipeline是一个过滤器,而过滤器需要指定对应过滤器的顺序,所以ChannelPipeline中有addFirst和addLast方法来添加不同的顺序。然后可以看到ChannelPipeline继承了两个接口ChannelInboundInvoker和ChannelOutboundInvoker。先看一张channelPipeline的工作流程图:可以看出ChannelPipeline主要有两个操作,一个是读入站,一个是写出出站。对于Socket.read()这样的读入操作,实际调用的是ChannelInboundInvoker中的方法。对于外部IO写请求,调用ChannelOutboundInvoker中的方法。注意inbound和outbound的处理顺序是相反的,比如下面这个例子:ChannelPipelinep=...;p.addLast("1",newInboundHandlerA());p.addLast("2",newInboundHandlerB());p.addLast("3",newOutboundHandlerA());p.addLast("4",newOutboundHandlerB());p.addLast("5",newInboundOutboundHandlerX());ChannelPipeline添加了5个处理程序,包括2个InboundHandlers、2个OutboundHandlers和一个同时处理In和Out的Handler。那么当通道遇到入站事件时,会按照1、2、3、4、5的顺序进行处理,但是只有InboundHandler能够处理入站事件,所以实际的执行顺序是1、2、5。同理,当channel遇到outbound事件时,会按照5,4,3,2,1的顺序执行,但是只有outboundHandler可以处理outbound事件,所以实际的执行顺序是5,4,3。简单的说put,ChannelPipeline指定了Handler的执行顺序。ChannelHandlernetty是一个事件驱动的框架,所有的事件都由Handler来处理。ChannelHandler可以处理IO,拦截IO,或者将事件传递给ChannelPipeline中的下一个Handler进行处理。ChannelHandler的结构很简单,只有三个方法,分别是:voidhandlerAdded(ChannelHandlerContextctx)throwsException;voidhandlerRemoved(ChannelHandlerContextctx)抛出异常;voidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsExceptionout;不同的是,ChannelHandler可以分为两类,即ChannelInboundHandler和ChannelOutboundHandler。因为这两个是接口,实现起来比较麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter和ChannelDuplexHandler。前两个很好理解,就是inbound和outbound,最后一个可以同时处理inbound和outbound。ChannelHandler由ChannelHandlerContext提供,与ChannelPipeline的交互也是通过ChannelHandlerContext进行的。ChannelHandlerContextChannelHandlerContext允许ChannelHandler与ChannelPipeline或其他Handler进行交互。它是使Handler和Channel能够交互的上下文。比如在ChannelHandlerContext中,调用channel()获取绑定的channel。绑定的Handler可以通过调用handler()获取。通过调用fire*方法触发通道事件。看一下ChannelHandlerContext的定义:publicinterfaceChannelHandlerContextextendsAttributeMap,ChannelInboundInvoker,ChannelOutboundInvoker可以看到是一个AttributeMap,用来存储属性,或者是一个ChannelInboundInvoker和ChannelOutboundInvoker,用来触发和传播相应的事件。对于Inbound来说传播事件的方法有:ChannelHandlerContext.fireChannelRegistered()ChannelHandlerContext.fireChannelActive()ChannelHandlerContext.fireChannelRead(Object)ChannelHandlerContext.fireChannelReadComplete()ChannelHandlerContext.fireExceptionCaught(Throwable)ChannelHandlerContext.fireUserEventTriggered(Object)ChannelHandlerContext.fireChannelWritabilityChanged()ChannelHandlerContext.fireChannelInactive()ChannelHandlerContext.fireChannelUnregistered()对于Outbound来说传播事件的方法有:ChannelHandlerContext.bind(SocketAddress,ChannelPromise)ChannelHandlerContext.connect(SocketAddress,SocketAddress,ChannelPromise)ChannelHandlerContext.write(Object,ChannelPromiseChannelContext)ChannelHandler.read()ChannelHandlerContext.disconnect(ChannelPromise)ChannelHandlerContext.close(ChannelPromise)ChannelHandlerContext.deregister(ChannelPromise)在一个Handler中调用这些方法,然后将事件传递给下一个Handler,如下:publicclassMyInboundHandlerextendsChannelInboundHandlerAdapter{}}publicclassMyOutboundHandlerextendsChannelOutboundHandlerAdapter{@Overridepublicvoidclose(ChannelHandlerContextPromisectx,SystemPromise).out.println("Closing..");ctx.close(承诺);ChannelHandler中的状态变量ChannelHandler是一个Handler类。一般来说,这个类的实例可以被多个通道共享。前提是这个ChannelHandler没有共享状态变量但是有时候,我们需要在ChannelHandler中维护一个状态,所以就涉及到ChannelHandler中的状态变量,看下面的例子:@OverridepublicvoidchannelRead0(ChannelHandlerContextctx,Messagemessage){if(messageinstanceofLoginMessage){authenticate((LoginMessage)message);登录=真;}else(messageinstanceofGetDataMessage){if(loggedIn)ctx.writeAndFlush(fetchSecret((GetDataMessage)message));}else{失败();}}}...}在这个例子中,我们需要在收到LoginMessage后对消息进行鉴权,并保存鉴权状态,因为业务逻辑是这样的,所以必须要有一个状态变量。那么这样一个带状态变量的Handler只能绑定一个channel。如果绑定了多个channel,可能会出现状态不一致的问题。将通道绑定到Handler实例非常简单,只需在initChannel方法中使用new关键字创建一个新对象即可。公共类DataServerInitializer扩展ChannelInitializer{@OverridepublicvoidinitChannel(Channelchannel){channel.pipeline().addLast("handler",newDataServerHandler());那么除了新建一个handler实例之外,还有没有其他的方法呢?当然有,那就是ChannelHandlerContext中的AttributeKey属性。还是上面的例子,让我们看看如何使用AttributeKey:publicinterfaceMessage{//yourmethodshere}@SharablepublicclassDataServerHandlerextendsSimpleChannelInboundHandler{privatefinalAttributeKeyauth=AttributeKey.valueOf("auth");@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Messagemessage){Attributeattr=ctx.attr(auth);如果(登录消息的消息实例){验证((登录消息)o);属性设置(真);}else(messageinstanceofGetDataMessage){if(Boolean.TRUE.equals(attr.get())){ctx.writeAndFlush(fetchSecret((GetDataMessage)o));}else{失败();}}}...}在上面的例子中,首先定义了一个AttributeKey,然后使用ChannelHandlerContext的attr方法将Attribute设置为ChannelHandlerContext,这样Attribute就绑定到了Chan在nelHandlerContext中,即使在不同的Channel中使用同一个Handler,其属性也是不同的。以下是使用共享处理程序的示例:@OverridepublicvoidinitChannel(Channelchannel){channel.pipeline().addLast("handler",SHARED);请注意,在定义DataServerHandler时,我们添加了@Sharable注释。如果一个ChannelHandler使用了@Sharable注解,意味着你只能创建一次这个Handler,但是你可以将它绑定到一个或多个ChannelPipelines中。注意@Sharable注解是为java文档准备的,不会影响实际的代码执行。之前介绍过异步Handler,可以通过调用pipeline.addLast方法将handler添加到pipeline中。因为管道是一个过滤器结构,所以添加的处理程序是顺序处理的。但是,如果我想让某些处理程序在新线程中执行,该怎么办?如果我们希望在这些新线程中执行的Handlers是无序的呢?例如,我们现在有3个处理程序,即MyHandler1、MyHandler2和MyHandler3。顺序执行是这样写的:ChannelPipelinepipeline=ch.pipeline();pipeline.addLast("MyHandler1",newMyHandler1());pipeline.addLast("MyHandler2",newMyHandler2());pipeline.addLast("MyHandler3",newMyHandler3());如果想让MyHandler3在新的线程中执行,可以添加group选项让handler在新的组中运行:staticfinalEventExecutorGroupgroup=newDefaultEventExecutorGroup(16);ChannelPipelinepipeline=ch.pipeline();pipeline.addLast("MyHandler1",newMyHandler1());pipeline.addLast("MyHandler2",newMyHandler2());pipeline.addLast(group,"MyHandler3",newMyHandler3());但是,在上面的例子中,添加到DefaultEventExecutorGroup中的Handler也会被顺序执行。如果实在不想顺序执行,可以尝试使用UnorderedThreadPoolEventExecutor。总结本文对Event、Handler和PipeLine进行了解释,并说明了它们之间的关系和交互。后续将从netty的具体实践入手,进一步加深对netty的理解和应用。我希望你会喜欢。本文已收录于http://www.flydean.com/05-netty-channelevent/最流行的解读,最深刻的干货,最简洁的教程,很多你不知道的小技巧等着你去探索!欢迎关注我的公众号:《程序那些事儿》,懂技术,更懂你!