当前位置: 首页 > 科技观察

从偶发的线上宕机事件看Netty流量控制

时间:2023-03-17 11:26:16 科技观察

业务背景目前在移动使用场景中大量使用消息推送,推送消息可以帮助运营商更高效地实现运营目标(比如将营销活动推送给用户)或提醒APP新功能)。推送系统需要具备以下两个特性:消息秒级发送给用户,无延迟,支持每秒百万级推送,单机百万级长连接。支持通知、文本、自定义消息透传等多种呈现形式。正是由于以上原因,给系统的开发和维护带来了挑战。下图是推送系统的简要说明(API->推送模块->手机端)。问题背景在推送系统中,长连接集群在稳定性测试和压力测试阶段运行一段时间后随机挂掉一个进程。概率很小(频率大概一个月一次),会影响一些客户端消息。交货时间。推送系统中的长连接节点(Broker系统)是基于Netty开发的。该节点维护服务器与移动终端之间的长期连接。线上出现问题时,添加Netty内存泄漏监控参数,排查问题。没有发现问题。由于长连接节点是Netty开发的,为了方便读者理解,下面简单介绍一下Netty。Netty简介Netty是一个基于JavaNIO提供的API的高性能、异步事件驱动的NIO框架。它提供对TCP、UDP和文件传输的支持。Netty作为最流行的NIO框架,在互联网领域、大数据分布式计算领域、游戏行业、通信行业等得到广泛应用,HBase、Hadoop、Bees、Dubbo等开源组件也是基于Netty的NIO构建的框架。问题分析猜想一开始猜测是长连接数导致的,后来查看日志和分析代码发现不是这个原因。长连接数:39万,如下图:连接数和每个通道的大小都是1456字节。按40万长连接计算,不会造成内存过大。查看GC日志查看GC日志。发现进程在挂掉之前频繁fullGC(5分钟一次),但是内存并没有减少。怀疑是堆外内存泄漏。分析堆内存。ChannelOutboundBuffer对象占用内存近5G。泄漏的原因基本可以确定:ChannelOutboundBuffer中的entry太多了。查看ChannelOutboundBuffer的源码分析,是ChannelOutboundBuffer中的数据。没有写出来,造成积压;ChannelOutboundBuffer内部是一个链表结构。上图的分析数据没有写出来,为什么会这样呢?代码中其实有一个case判断连接是否可用(Channel.isActive),超时的连接会被关闭。从历史经验来看,这种情况在连接半开(客户端异常关闭)时出现的比较多---如果双方不通信就没有问题。根据以上猜想,对测试环境进行复现测试。1)模拟客户端集群,与长连接服务器建立连接,设置客户端节点的防火墙,模拟服务器与客户端网络异常(即模拟Channel.isActive调用成功,但实际数据无法发送出去)。2)减少堆外内存,继续向之前的客户端发送测试消息。消息大小(大约1K)。3)按照128M内存计算,实际上会出现多次调用9W。问题解决启用自动读取机制当通道不可写时,关闭自动读取;publicvoidchannelReadComplete(ChannelHandlerContextctx)throwsException{if(!ctx.channel().isWritable()){Channelchannel=ctx.channel();ChannelInfochannelInfo=ChannelManager.CHANNEL_CHANNELINFO.get(channel);StringclientId="";if(channelInfo!=null){clientId=channelInfo.getClientId();}LOGGER.info("channelisunwritable,turnoffautoread,clientId:{}",clientId);channel.config().setAutoRead(false);}}数据开启自动读取是可写的;@OverridepublicvoidchannelWritabilityChanged(ChannelHandlerContextctx)throwsException{Channelchannel=ctx.channel();ChannelInfochannelInfo=ChannelManager.CHANNEL_CHANNELINFO.get(channel);StringclientId==””;f(null){clientId=channelInfo.getClientId();}if(channel.isWritable()){LOGGER.info("channeliswritableagain,turnonautoread,clientId:{}",clientId);channel.config().setAutoRead(true);}}说明:autoRead的功能是更精确的码率控制。如果开启,Netty会为我们注册读取事件。当读事件被注册时,如果网络是可读的,Netty会从通道中读取数据。那么如果关闭了autoread,Netty就不会注册这个read事件了。这样即使peer发送了数据,也不会触发read事件,也就不会从channel中读取数据。当recv_buffer已满时,将不再接收数据。设置高低水位serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,newWriteBufferWaterMark(1024*1024,8*1024*1024));注意:高低水位配合后续的isWritable使用增加channel.isWritable()判断除验证通道外的通道是否可用。isActive()还需要加上channel.isWrite()的判断。isActive只是保证连接是否活跃,能否写入由isWrite决定。privatevoidwriteBackMessage(ChannelHandlerContextctx,MqttMessagemessage){Channelchannel=ctx.channel();//增加channel.isWritable()的判断if(channel.isActive()&&channel.isWritable()){ChannelFuturecf=channel.writeAndFlush(message);if(cf.isDone()&&cf.cause()!=null){LOGGER.error("channelWriteerror!",cf.cause());ctx.close();}}}注意:isWritable可以用来控制ChannelOutboundBuffer,而不是对其进行无限扩展。该机制是利用设定的通道高低水位来判断。问题验证修改后,进行测试,发送27W次没有报错;一般的Netty数据处理流程如下:读取数据由业务线程处理,处理完毕再发送出去(整个过程是异步的),为了提高网络的吞吐量,Netty增加了一个ChannelOutboundBuffer在业务层和套接字之间。调用channel.write时,所有写入的数据实际上并没有写入socket,而是先写入了ChannelOutboundBuffer。当调用channel.flush时,它实际上是写入套接字。因为中间有一个buffer,有速率匹配,而且这个buffer还是无界的(链表),也就是说如果不控制channel.write的速度,大量的数据会堆积在这个buffer中,如果遇到socket无法写入数据时(此时isActive判断无效)或者写入速度慢。最有可能的结果是资源耗尽,如果ChannelOutboundBuffer存储了一个DirectByteBuffer,这将使问题更难排查。流程可以抽象如下:从上面的分析可以看出,第1步写的太快了(处理的太快了)或者下游无法发送数据,都会出现问题。这实际上是一个速率匹配问题。netty源码显示超高水位。当ChannelOutboundBuffer容量超过高水位设置阈值时,isWritable()返回false,设置通道不可写(setUnwritable),并触发fireChannelWritabilityChanged()。privatevoidincrementPendingOutboundBytes(longsize,booleaninvokeLater){if(size==0){return;}longnewWriteBufferSize=TOTAL_PENDING_SIZE_UPDATER.addAndGet(this,size);if(newWriteBufferSize>channel.config().getWriteBufferHighWaterMarkw())(vot}LaterMarkw()){void}Later}privatevoidsetUnwritable(booleaninvokeLater){for(;;){finalintoldValue=unwritable;finalintnewValue=oldValue|1;if(UNWRITABLE_UPDATER.compareAndSet(this,oldValue,newValue)){if(oldValue==0&&newValue!=0){fireChannelWritabilityChanged(invokeLater);}break;}}}低于低水位当ChannelOutboundBuffer容量低于低水位设置阈值时,isWritable()返回true,设置通道可写,并触发fireChannelWritabilityChanged().privatevoiddecrementPendingOutboundBytes(longsize,booleaninvokeLater,booleannotifyWritability){if(size==0){return;}longnewWriteBufferSize=TOTAL_PENDING_SIZE_UPDATER.addAndGet(this,-size);如果(notifyWritability&&newWriteBufferSize