当前位置: 首页 > 科技观察

说说高性能服务器的Reactor模型

时间:2023-03-19 20:37:25 科技观察

在这个云时代,我们使用的软件99%可以说是C/S架构!你发邮件用的outlook,foxmail等给你看视频的优酷、土豆等office365给你写文档用的,googledoc、印象笔记等给你用ie、chrome等浏览网页的(B/S是一种特殊的C/S)...C/S架构软件带来的一个明显的好处就是:只要有互联网,在任何地方都可以做同样的事情。示例:您在家中使用Office365编写了一份文档。到了公司,打开编辑地址就可以看到在家写的文档,展示出来或者继续编辑。甚至可以在手机上阅读和编辑。无需复制U盘即可复制。C/S架构可以抽象为如下模型:C是Client(客户端),B上面是Browser(浏览器)S是Server(服务器):服务器管理一定的资源,为其提供服务的主要原因之一为什么客户端提供某种服务C/S架构流行是因为网络速度的提高和成本的降低,尤其是无线网络速度的提高。试想,在2G时代,大部分人阅读的都是文字网页、小说等。看图简直是一种奢侈!更别说看视频了!网速的提升让越来越多的人使用互联网,比如:优酷、微信都有上亿用户,更别说天猫双11即时访问了!这对服务器要求很高!可以快速处理大量的用户请求!服务器如何快速处理用户请求?高性能服务器至少要满足以下要求:效率高:既然是高性能,那么处理客户端请求的效率当然是非常高的。高可用:不能随便挂掉。编程简单:基于此服务器的业务开发需要足够简单。Scalable:可伸缩可伸缩:容量可以简单的通过部署进行伸缩,即服务需要是无状态的,满足上述要求的基础之一就是高性能IO!Socket不管你是发邮件,浏览网页,还是看视频~实际底层都是用到TCP/IP,TCP/IP的编程抽象就是Socket!一直对Socket的中文翻译感到困惑。个人认为是我接触过最莫名其妙的专业术语翻译。没有一个!Socket的中文翻译就是“套接字”!我勒个去?很长一段时间,我都不能把它和网络编程联系起来!后来找了一些资料,终于在知乎上找到了满意的答案(具体链接见文末参考链接)!Socket的本义是套接字,我想表达的是套接字和插槽的关系!“发送套接字”插入到“接收套接字”中,建立链接,然后就可以开始通信了!socket的翻译应该是指socketpipe(如下图所示)!从这个角度来说,还是有些意义的!socket的翻译已经很标准了,不用担心!下面来看看sockets建立连接和通信的过程吧!它实际上是对TCP/IP连接和通信过程的抽象:服务端Socket会绑定到指定的端口,Listen客户端的“插入”会在服务端接受连接后将客户端Socket连接到服务端客户端,你可以发送和接收消息。通讯完成后,即可关闭。对于IO,我们听说的比较多:BIO:blockingIONIO:non-blockingIOsynchronousIOasynchronousIO及其组合:synchronousBlockingIOsynchronousnon-blockingIOAsynchronousblockingIOAsynchronousnon-blockingIO那么什么是blockingIO,非阻塞IO、同步IO、异步IO呢?一个IO操作其实分为两步:发起IO请求和实际IO操作。阻塞IO和非阻塞IO的区别在于第一步:发起IO请求是否会阻塞,如果一直阻塞到完成,就是传统的阻塞IO;如果没有阻塞,那么就是非阻塞IO。同步IO和异步IO的区别在于第二步是否阻塞。如果实际IO读写阻塞请求进程,那么就是同步IO,所以阻塞IO、非阻塞IO、IO多路复用、信号驱动IO都是同步IO;如果不是阻塞,而是操作系统帮你完成IO操作,然后把结果返回给你,那么就是异步IO。一个很贴切的例子:比如你家网络坏了,你打电话给中国电信报修!你拨号——客户端连接服务器,手机连接——建立连接,你说:“我家网络坏了,帮我修一下”——发送消息后,你在那里等,然后就阻塞了IO。如果碰巧有事,就放下电话,再处理其他事情。过一会,你来问修好了没有——那是非阻塞IO。客服说:“我马上帮你处理,你稍等”——SynchronousIO没有前两个那么流行,先不讨论!下面从代码层面来看一下BIO和NIO的流程吧!BIO客户端代码//Bind,ConnectSocketclient=newSocket("127.0.0.1",7777);//读写PrintWriterpw=newPrintWriter(client.getOutputStream());BufferedReaderbr=newBufferedReader(newInputStreamReader(System.in));write(br.readLine());//Closepw.close();br.close();服务器代码Socketsocket;//Bind,ListenServerSocketss=newServerSocket(7777);while(true){//Acceptsocket=ss.accept();//一般新建一个线程读写BufferedReaderbr=newBufferedReader(newInputStreamReader(socket.getInputStream()));System.out.println("youinputis:"+br.readLine());}以上代码即可据说是学习JavaSocket的入门级代码。代码流程和前面的图可以一一对比。模型图如下:BIO优缺点优点模型简单编码缺点性能瓶颈低优缺点很明显这里主要缺点:主要瓶颈在线程上。每个连接创建一个线程。虽然线程消耗比进程小,但是一台机器实际能创建的有效线程是有限的。以Java为例,1.5之后,一个线程大概消耗1M内存!并且随着线程数的增加,CPU切换线程上下文的消耗也会增加。相应增加,超过一定阈值后,继续增加线程,性能不增反减!而且因为一个连接创建一个新线程,所以编码模型非常简单!就性能瓶颈而言,BIO是确定的,不适合高性能服务器开发!像Tomcat这样的Web服务器从7开始就从BIO改为NIO,以提高服务器性能!NIONIO客户端代码(连接)//获取socket通道SocketChannelchannel=SocketChannel.open();channel.configureBlocking(false);//获取通道管理器selector=Selector.open();channel.connect(newInetSocketAddress(serverIp,port));//注册SelectionKey.OP_CONNECT事件channel.register(selector,SelectionKey.OP_CONNECT);NIO客户端代码(监听)while(true){//选择注册的io操作的事件(第一次是SelectionKey.OP_CONNECT)selector.select();while(SelectionKeykey:selector.selectedKeys()){if(key.isConnectable()){SocketChannelchannel=(SocketChannel)key.channel();if(channel.isConnectionPending()){channel.finishConnect();//如果连接,完成连接}channel.register(selector,SelectionKey.OP_READ);}elseif(key.isReadable()){//有可读数据事件。SocketChannelchannel=(SocketChannel)key.channel();ByteBufferbuffer=ByteBuffer.allocate(10);channel.read(buffer);byte[]data=buffer.array();Stringmessage=newString(data);System.out.println("receviemessagefromserver:,size:"+buffer.position()+"msg:"+message);}}}NIO服务端代码(连接)//获取一个ServerSocket通道ServerSocketChannelserverChannel=ServerSocketChannel.open();serverChannel.configureBlocking(false);serverChannel.socket().bind(newInetSocketAddress(port));//获取通道管理器selector=Selector.open();//将通道管理器与通道绑定,并为通道注册SelectionKey。OP_ACCEPT事件,serverChannel.register(selector,SelectionKey.OP_ACCEPT);NIO服务器代码(监听)while(true){//当注册的事件到达时,该方法返回,否则阻塞。selector.select();for(SelectionKeykey:selector.selectedKeys()){if(key.isAcceptable()){ServerSocketChannelserver=(ServerSocketChannel)key.channel();SocketChannelchannel=server.accept();channel.write(ByteBuffer.wrap(newString("sendmessagetoclient").getBytes()));//连接客户端成功后,为客户端通道注册SelectionKey.OP_READ事件。channel.register(selector,SelectionKey.OP_READ);}elseif(key.isReadable()){//有可读数据事件SocketChannelchannel=(SocketChannel)key.channel();ByteBufferbuffer=ByteBuffer.allocate(10);intread=channel.read(buffer);byte[]data=buffer.array();Stringmessage=newString(data);System.out.println("receivemessagefromclient,size:"+buffer.position()+"msg:"+message);}}}NIO模型示例如下:Acceptor注册Selector,监听accept事件。当客户端连接时,触发accept事件,服务器端构建对应的Channel,并在其上注册Selector,监听读写事件。进行相应的读写处理NIO优缺点,性能瓶颈,高劣模型,编码复杂,需要处理半包问题。连接!相应的,编码也复杂很多,从上面的代码可以很清楚的体会到。还有一个问题。因为是非阻塞的,所以应用无法知道消息什么时候读完了,所以就出现了半包问题!half-packet问题只要看下图就可以理解!我们知道,TCP/IP在发送报文的时候,可能会被解包(如上图1所示)!这使得接收端无法知道接收到的数据何时是完整的数据。例如:发送端分别发送了ABC、DEF、GHI三个信息,拆分成AB、CDRFG、H、I四个数据包发送。接收端如何恢复它们?在BIO模型中,当读取数据后会阻塞,而在NIO中则不会!所以你需要自己处理!比如以换行符作为判断依据,或者出现定长消息,或者自定义协议!NIO虽然性能高,但是编码复杂,而且需要处理半包问题!为了方便NIO开发,就有了Reactor模型!Reactor模型AWT事件Reactor模型与AWT事件模型非常相似,它将消息放入队列中并通过异步线程池使用它们!Reactor中的ComponentReactor:Reactor是IO事件的调度器。Acceptor:Acceptor接受client连接,建立对应client的Handler,并将Handler注册到Reactor。Handler:与客户端通信的实体,根据这个流程进行业务处理。一般在基本Handler的基础上,还会有进一步的层级划分,用于对decode、process、encoder等流程进行抽象。比如对于WebServer来说,decode通常是对HTTP请求的解析,process的过程会进一步涉及到Listener和Servlet的调用。Reactor模式下业务逻辑的处理被分散的IO事件打断,所以Handler需要有合适的机制在需要的信息不完整(读到一半),下一个IO事件到来时(否则读到一半)保存上下文可读)可以继续中断处理。为了简化设计,Handler通常被设计成一个状态机,按照GoF的状态模式来实现。对应上面的NIO代码:Reactor:相当于具有分配函数的SelectorAcceptor:NIO中建立连接的判断分支Handler:消息读写处理等操作类Reactor从线程的选择上可以细分为以下几种pool和Reactor几种:Reactor单线程模型这种模型和上面的NIO流程很像,只是消息相关的处理是独立于Handler的!虽然上面说了NIO的一个线程可以支持所有的IO处理。但是瓶颈也很明显!我们来看一个客户的情况。如果客户端多次请求,如果Handler中的处理速度慢,那么后续的客户端请求就会积压,导致响应缓慢!所以引入Reactor多线程模型!Reactor多线程模型Reactor多线程模型是将Handler中的IO操作和非IO操作分离。进行IO操作的线程称为IO线程,不进行IO操作的线程称为工作线程!这样的话,客户端的请求会直接丢到线程池中,客户端发送请求不会被阻塞!但是当用户数量进一步增加时,Reactor就会出现瓶颈!因为Reactor既要处理IO操作请求,又要响应连接请求!为了共享Reactor,所以引入主从Reactor模型!主从Reactor模型用于响应连接请求,从Reactor用于处理IO操作请求!NettyNetty是一个高性能的NIO框架,它是Reactor模型的一种实现!Netty客户端代码EventLoopGroupworkerGroup=newNioEventLoopGroup();try{Bootstrapb=newBootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.SO_KEEPALIVE,true);b.handler(newChannelInitializer(){@OverridepublicvoidinitChannel(SocketChannelch)throwsException{ch.pipeline().addLast(newTimeClientHandler());}});ChannelFuturef=b.connect(host,port).sync();f.channel().closeFuture().sync();}finally{workerGroup.shutdownGracefully();}NettyClientHandlerpublicclassTimeClientHandlerextendsChannelInboundHandlerAdapter{@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg){ByteBufm=(ByteBuf)msg;try{longcurrentTimeMillis=(m.readUnsignedInt()-2208988800L)*1000L;System.out.println(newDate(currentTimeMillis));ctx.close();}最后{m.release();}}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause){cause.printStackTrace();ctx.close();}}Netty服务端代码EventLoopGroupbossGroup=newNioEventLoopGroup();EventLoopGroupworkerGroup=newNioEventLoopGroup();try{ServerBootstrapb=newServerBootstrap();b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(newChannelInitializer(){@OverridepublicvoidinitChannel(SocketChannelch)throwsException{ch.pipeline().addLast(newTimeServerHandler());}}).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true);//绑定并开始接受传入的连接.ChannelFuturef=b.bind(port).sync();f.channel().closeFuture().sync();}最后{workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}Netty服务器处理程序publicclassTimeServerHandlerextendsChannelInboundHandlerAdapter{@OverridepublicvoidchannelActive(finalChannelHandlerContextctx){finalByteBuftime=ctx.alloc().buffer(4);time.writeInt((int)10Mill0(L+current)2208988800L));finalChannelFuturef=ctx.writeAndFlush(time);f.addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturefuture){assertf==future;ctx.close();}});}@OverridecausepublicvoidexceptionCaughtand(Chanctler{cause.printStackTrace();ctx.close();}}从Netty服务端代码看,对应Reactor模型!EventLoopGroup相当于Reactor,bossGroup对应主Reactor,workerGroup对应ReactorTimeServerHandler,其中Handlerchild方法配置客户端通道,不以child开头的方法配置服务端通道的具体Netty内容,请访问Netty官网!Netty问题一目了然Netty开发中的一个重要问题是回调。首先,它打破了线性编码的习惯。第二个是回调地狱!看下面的例子:a.doing1();//1a.doing2();//2a.doing3();//31,2,3如果代码是同步的,就会依次执行!但是如果它不是同步的呢?我还是要2在1之后执行,3在2之后执行!该怎么办?想想AJAX!我们需要写这样的东西!a.doing1(newCallback(){publicvoidcallback(){a.doing2(newCallback(){publicvoidcallback(){a.doing3();}})}});有什么办法可以解决这个问题吗?其实实现类似Future!的功能并不难!当Client获取到结果的时候会阻塞,然后获取到结果后继续往下走!一种实现方案是使用锁,另一种是使用RingBuffer。经过测试,使用RingBuffer相比使用lock有2000TPS左右的提升!