一、业务背景目前在移动端的使用场景中大量使用消息推送。推送消息可以帮助运营商更高效地实现运营目标(例如向用户推送营销活动或提醒APP新功能)。推送系统需要具备以下两个特性:消息秒级发送给用户,无延迟,支持每秒百万级推送,单机百万级长连接。支持通知、文本、自定义消息透传等多种呈现形式。正是由于以上原因,给系统的开发和维护带来了挑战。下图是推送系统的简要说明(API->推送模块->手机端)。2.问题后台推送系统在长连接集群中,在稳定性测试和压力测试中经过一段时间后,会出现一个进程随机挂掉的情况。概率很小(频率大概一个月一次),会影响部分客户。从终端发送消息的时间限制。推送系统中的长连接节点(Broker系统)是基于Netty开发的。该节点维护服务器与移动终端之间的长期连接。线上出现问题时,添加Netty内存泄漏监控参数,排查问题。没有发现问题。由于长连接节点是Netty开发的,为了方便读者理解,下面简单介绍一下Netty。3、Netty简介Netty是一个高性能、异步事件驱动的NIO框架,基于JavaNIO提供的API实现。它提供对TCP、UDP和文件传输的支持。Netty作为最流行的NIO框架,在互联网领域、大数据分布式计算领域、游戏行业、通信行业等得到广泛应用,HBase、Hadoop、Bees、Dubbo等开源组件也是基于Netty的NIO构建的框架。4.问题分析4.1猜测一开始猜测是长连接数导致的,后来查看日志和分析代码发现不是这个原因导致的。长连接数:39万,如下图:每个通道的字节大小为1456,按40万长连接计算,这样内存不会太大。4.2查看GC日志查看GC日志,发现进程在挂掉前频繁fullGC(5分钟一次),但内存并没有减少,怀疑是堆外内存泄漏。4.3堆内存分析ChannelOutboundBuffer对象占用内存近5G。泄漏的原因基本可以确定:ChannelOutboundBuffer中的条目数过多。查看ChannelOutboundBuffer的源码分析,是ChannelOutboundBuffer中的数据。没有写出来,造成积压;ChannelOutboundBuffer内部是一个链表结构。4.4上图的分析数据没有写出来,为什么会这样?代码中其实有一个case判断连接是否可用(Channel.isActive),超时的连接会被关闭。从历史经验来看,这种情况在连接半开(客户端异常关闭)时出现的比较多---如果双方不通信就没有问题。根据以上猜想,对测试环境进行复现测试。1)模拟客户端集群,与长连接服务器建立连接,设置客户端节点的防火墙,模拟服务器与客户端网络异常(即模拟Channel.isActive调用成功,但实际数据无法发送出去)。2)减少堆外内存,继续向之前的客户端发送测试消息。消息大小(大约1K)。3)按照128M内存计算,实际上会出现多次调用9W。五、问题解决5.1启动autoRead机制当channel不可写时,关闭autoRead;publicvoidchannelReadComplete(ChannelHandlerContextctx)throwsException{if(!ctx.channel().isWritable()){Channelchannel=ctx.channel();ChannelInfochannelInfo=ChannelManager.CHANNEL_CHANNELINFO.get(channel);StringclientId="";if(channelInfo!=null){clientId=channelInfo.getClientId();}LOGGER.info("通道不可写,关闭自动读取,clientId:{}",clientId);channel.config().setAutoRead(false);}}当数据可写时启动autoRead;@OverridepublicvoidchannelWritabilityChanged(ChannelHandlerContextctx)throwsException{Channelchannel=ctx.channel();ChannelInfochannelInfo=ChannelManager.CHANNEL_CHANNELINFO.get(channel);StringclientId="";if(channelInfo!=null){clientId=channelInfo.getClientId();}if(channel.isWritable()){LOGGER.info("channeliswritableagain,turnonautoread,clientId:{}",clientId);channel.config().setAutoRead(true);}}解释:autoRead的作用是更精准的码率控制,开启后Netty会帮我们注册读取事件当注册了read事件后,如果网络可读,Netty会从channel中读取数据,如果autoread关闭,Netty不会注册read事件,这样即使peer发送数据,这样就不会再从channel中读取数据。当recv_buffer满了,就不会再接收数据了。5.2设置高低水位serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,newWriteBufferWaterMark(1024*1024,8*1024*1024));注意:使用5.3添加channel.isWritable()判断通道是否可用,配合后面的isWritable,除了检查channel.isActive()外,还需要添加channel.isWrite()判断连接是否Active,是否可以写入由isWrite决定。privatevoidwriteBackMessage(ChannelHandlerContextctx,MqttMessagemessage){Channelchannel=ctx.channel();//添加channel.isWritable()判断if(channel.isActive()&&channel.isWritable()){ChannelFuturecf=channel.writeAndFlush(消息);if(cf.isDone()&&cf.cause()!=null){LOGGER.error("channelWriteerror!",cf.cause());ctx.close();}}}注意:可以通过isWritable来控制ChannelOutboundBuffer,防止其无限扩展。该机制是利用设定的通道高低水位来判断。5.4问题验证修改后测试,发送到27W次后不报错;6、方案分析一般的Netty数据处理流程如下:将读取到的数据交给业务线程处理,处理后发送出去(整个过程是Asynchronous的),为了提高网络的吞吐量,Netty在业务层和socket之间添加了一个ChannelOutboundBuffer。调用channel.write时,所有写入的数据实际上并没有写入socket,而是先写入了ChannelOutboundBuffer。当调用channel.flush时,它实际上是写入套接字。因为中间有一个buffer,有速率匹配,而且这个buffer还是无界的(链表),也就是说如果不控制channel.write的速度,大量的数据会堆积在这个buffer中,如果遇到socket无法写入数据时(此时isActive判断无效)或者写入速度慢。最有可能的结果是资源耗尽,如果ChannelOutboundBuffer存储了一个DirectByteBuffer,这将使问题更难排查。流程可以抽象如下:从上面的分析可以看出,第1步写的太快了(处理的太快了)或者下游无法发送数据,都会出现问题。这实际上是一个速率匹配问题。7、Netty源码说明超过高水位当ChannelOutboundBuffer的容量超过高水位设置阈值时,isWritable()返回false,设置通道不可写(setUnwritable),并触发fireChannelWritabilityChanged()。privatevoidincrementPendingOutboundBytes(longsize,booleaninvokeLater){if(size==0){return;}longnewWriteBufferSize=TOTAL_PENDING_SIZE_UPDATER.addAndGet(this,size);如果(newWriteBufferSizetable>channel.config().getWriteBuffer(High)(invokeLater);}}privatevoidsetUnwritable(booleaninvokeLater){for(;;){finalintoldValue=unwritable;finalintnewValue=oldValue|1;if(UNWRITABLE_UPDATER.compareAndSet(this,oldValue,newValue)){if(oldValue==0&&newValue!=0){fireChannelWritabilityChanged(invokeLater);}break;}}}低于低水位时ChannelOutboundBuffer的容量较低低于低水位设定阈值,isWritable()返回true,设置通道可写,并触发fireChannelWritabilityChanged()。privatevoiddecrementPendingOutboundBytes(longsize,booleaninvokeLater,booleannotifyWritability){if(size==0){return;}longnewWriteBufferSize=TOTAL_PENDING_SIZE_UPDATER.addAndGet(this,-size);如果(notifyWritability.ChangeSize&&newWrite(getWriteBufferLowWaterMark()){setWritable(invokeLater);}}privatevoidsetWritable(booleaninvokeLater){for(;;){finalintoldValue=unwritable;finalintnewValue=oldValue&~1;如果(UNWRITABLE_UPDATER.compareAndSet(this,oldValue,newValue)){if(oldValue!=0&&newValue==0){fireChannelWritabilityChanged(invokeLater);}break;}}}8.小结ChannelOutboundBuffer容量超过高水位时设置阈值,isWritable()返回false,表示消息正在累积,需要降低写入速度。当ChannelOutboundBuffer容量低于低水位设置阈值时,isWritable()返回true,表示消息太少消息和写入速度需要提高,经过以上三步的修改,在线观察半年的部署没有出现问题。作者:vivo互联网服务器团队-张林
