当前位置: 首页 > 后端技术 > Java

Netty系列:Channel和ChannelGroup

时间:2023-04-02 01:35:21 Java

简介Channel是netty中数据传输和数据处理的通道,也是netty程序中不可或缺的一部分。在netty中,channel是一个接口,对于不同的数据类型或者协议channel会有具体的不同实现。channel虽然很重要,但是在代码中真的很神秘。基本上,我们很少看到直接使用通道。真的是这样吗?与频道相关的ChannelGroup的作用是什么?一起来看看吧。其实Netty的代码对于神龙看到的开始和结束的通道都有一个固定的模板。首先根据是服务端还是客户端,然后创建对应的Bootstrap和ServerBootstrap。然后为这个Bootstrap配置相应的group方法。然后为Bootstrap配置channel和handler,最后启动Bootstrap。这样一个标准的netty程序就完成了。您需要做的就是为其选择合适的组、通道和处理程序。先来看最简单的NioServerSocketChannel:EventLoopGroupbossGroup=newNioEventLoopGroup(1);EventLoopGroupworkerGroup=newNioEventLoopGroup();尝试{ServerBootstrapb=newServerBootstrap();b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(newLoggingHandler(LogLevel.INFO)).childHandler(newChatServerInitializer());b.bind(PORT).sync().channel().closeFuture().sync();}最后{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}这里,我们将NioServerSocketChannel设置为ServerBootstrap的通道。就这样结束了吗?通道用在什么地方?别担心,让我们仔细看看try块中的最后一句:b.bind(PORT).sync().channel().closeFuture().sync();b.bind(PORT).sync()实际上返回一个ChannelFuture对象被创建,通过调用它的channel方法返回与之关联的Channel对象。然后我们调用channel.closeFuture()方法。closeFuture方法返回一个ChannelFuture对象,该对象将在通道关闭时收到通知。sync方法会实现同步阻塞,等待通道关闭,从而执行后续的eventGroup的关闭操作。在ServerBootstrap中构建模板时,通道实际上有两个作用。第一个功能是指定ServerBootstrap的通道,第二个功能是通过通道获取通道关闭事件,最终关闭整个netty程序。虽然我们基本看不到通道的直接方法调用,但是毫无疑问,通道是netty的灵魂。接下来我们看一下handler对具体消息处理的基本操作:ctx.flush();通常如果我们需要在handler中向channel写入数据,我们会调用ChannelHandlerContext的write方法。这个方法跟通道有什么关系?首先write方法是ChannelOutboundInvoker接口中的一个方法,ChannelHandlerContext和Channel都继承了ChannelOutboundInvoker接口,也就是说ChannelHandlerContext和Channel都有一个write方法:ChannelFuturewrite(Objectmsg);因为这里我们使用的是NioServerSocketChannel,所以我们具体看一下write在NioServerSocketChannel中的实现。查看代码后我们会发现NioServerSocketChannel继承自AbstractNioMessageChannel,AbstractNioMessageChannel继承自AbstractNioChannel,AbstractNioChannel继承自AbstractChannel,而这个写方法是在AbstractChannel中实现的:ofChannel}(write方法其实调用的是pipeline的write方法,下面是pipeLine中的write方法:publicfinalChannelFuturewrite(Objectmsg){returntail.write(msg);}这里tail是一个AbstractChannelHandlerContext对象。这样我们得出结论,channel中的write方法实际上调用的是ChannelHandlerContext中的write方法,所以上面的:ctx.write("ChannelActivestate!\r\n");其实是可以调用的直接从通道:Channelch=b.bind(0).sync().channel();//将消息写入通道ch.writeAndFlush("ChannelActivestate!\r\n").sync();channel和channelGroup通道是netty的灵魂,对于Bootstrap来说,要获取对应的通道,可以调用:b.bind(PORT).sync().channel()来获取,从上面的代码我们也可以看到一个Bootstrap只会对应一个通道。通道中有一个parent()方法,用于返回其父通道,因此通道具有层次结构。我们看一下channelGroup的定义:publicinterfaceChannelGroupextendsSet,Comparable可以看到ChannelGroup其实是一个Channel的集合。ChannelGroup用于将相似的Channel构建成一个集合,从而可以统一管理多个Channel。有朋友可能会问了,一个Bootstrap不是只对应一个通道吗?那么频道的集合从哪里来呢?事实上,在一些复杂的程序中,我们可能会启动多个Bootstrap来处理不同的业务,因此相应的会有多个通道。如果创建的渠道太多,而且这些渠道非常同质化,就需要对这些渠道进行统一管理。这时候就需要用到channelGroup了。channelGroup的基本使用首先看一下channelGroup的基本使用。首先,创建一个channelGroup:ChannelGrouprecipients=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);有了channelGroup之后,可以调用add方法为其添加不同的频道:recipients.add(channelA);收件人.add(频道B);也可以统一向这些通道发送消息:recipients.write(Unpooled.copiedBuffer("这是从channelGroup发送的统一消息。",CharsetUtil.UTF_8));基本上channelGroup提供了write、flush、flushAndWrite、writeAndFlush、disconnect、close、newCloseFuture等函数对集合中的channel进行统一管理。如果您有多个频道,请考虑使用channelGroup。除此之外,channelGroup还有一些其他的特点,我们来仔细看看。自动将关闭的频道移出ChannelGroup是频道的集合。当然,我们只想保存打开状态的通道。如果是处于关闭状态的通道,手动从ChannelGroup中移除太麻烦。所以在ChannelGroup中,如果一个channel被关闭,它会自动从ChannelGroup中移除。这个功能是如何实现的呢?先看channelGroup的add方法:publicbooleanadd(Channelchannel){ConcurrentMapmap=channelinstanceofServerChannel?服务器频道:非服务器频道;添加的布尔值=map.putIfAbsent(channel.id(),channel)==null;如果(添加){channel.closeFuture().addListener(remover);}if(stayClosed&&closed){channel.close();}返回添加;可以看到在add方法中,区分了channel是serverchannel还是non-serverchannel。然后根据channelid存储到ConcurrentMap中。如果添加成功,则向通道添加closeFuture回调。当通道关闭时,将调用此remover方法:}};remover方法将通道移出serverChannels或nonServerChannels。这样可以确保只有处于打开状态的通道才会保存在ChannelGroup中。同时关闭serverChannel和acceptedChannel虽然ServerBootstrap的bind方法只会返回一个channel,但是对于server来说,可以有多个workerEventLoopGroup,所以client和server建立连接后建立的acceptedChannel是一个子channel服务器通道。也就是说,一个服务器有一个服务器通道和多个接受通道。那么如果我们想同时关闭这些通道,可以使用ChannelGroup的close方法。因为如果server通道和non-server通道在同一个ChannelGroup,那么所有的IO命令都会先送到server通道,再送到non-server通道。所以我们可以把Server通道和非Server通道都添加到同一个ChannelGroup中,在程序最后调用ChannelGroup的close方法来达到这个目的:ChannelGroupallChannels=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);publicstaticvoidmain(String[]args)throwsException{ServerBootstrapb=newServerBootstrap(..);...b.childHandler(newMyHandler());//启动服务器b.getPipeline().addLast("handler",newMyHandler());频道serverChannel=b.bind(..).sync();allChannels.add(服务器频道);...等待关闭命令...//关闭serverChannel和所有接受的连接。allChannels.close()。等待不可中断();}publicclassMyHandlerextendsChannelInboundHandlerAdapter{@OverridepublicvoidchannelActive(ChannelHandlerContextctx){//将接受的通道添加到allChannelsallChannels.add(ctx.channel());super.channelActive(ctx);}}ChannelGroupFuture另外和channel一样,channelGroup的操作是异步的,它会返回一个ChannelGroupFuture对象我们看一下ChannelGroupFuture的定义:publicinterfaceChannelGroupFutureextendsFuture,Iterable可以看到ChannelGroupFuture是一个Future,同时它也是一个ChannelFuture遍历器,可以遍历ChannelGroup中所有channel返回的ChannelFuture。同时ChannelGroupFuture提供了isSuccess、isPartialSuccess、isPartialFailure等方法来判断命令是否部分成功。ChannelGroupFuture还提供了addListener方法来监听特定的事件。总结一下,channel是netty的核心。当我们有多个频道不方便管理时,可以使用channelGroup进行统一管理。本文已收录于http://www.flydean.com/04-1-netty-channel-group/最流行的解读,最深刻的干货,最简洁的教程,很多你不知道的小技巧等你来发现!欢迎关注我的公众号:《程序那些事儿》,懂技术,更懂你!