本文转载自微信公众号《KK建筑师》,作者wangkai。转载请联系KKarchitects公众号。本次我们将实现一个支持百万级并发连接的采集服务器,找出异步+NIO编程方式与实时流模型之间的千丝万缕的联系。1.需求简单的需求功能如下:数据采集是一个web服务器,可以接收http请求发送的事件,事件为json格式;接收到后解码,检查字符串是否可以解码成json对象;提取、清理和转换消息;最后将它们发送给卡夫卡。性能要求:支持百万级并发连接;充分利用CPU资源和IO资源;2.实施方案分析网页端和移动端会生成一些埋点文件,通过http发送到采集服务器。说到网络连接,第一个想到的就是Socket。1.初始版本:使用BIO实现客户端与服务端的通信,我们可以很方便的快速实现一个多线程的web端服务端。模型图如下(为了节省篇幅就不写代码了,很简单,但不实用,百度有很多)。简单描述一下:每次有请求过来,就创建一个线程去执行。如下图所示:但是缺点也很明显,可以随便列举三点:1.1创建和销毁线程开销大。Java中的线程模型是基于操作系统原生的线程模型实现的,也就是说Java中的线程实际上是基于内核线程实现的,线程的创建、销毁和同步都需要系统调用,而系统调用需要用户态和内核态来回切换,成本比较高;1.2线程本身占用内存大。在Java中,默认一个Thread,线程栈大小为1M。一旦线程数超过千,恐怕整个Jvm的内存都要消耗掉一半;1.3线程切换成本很高。当切换线程时,操作系统需要保留线程的上下文,然后执行系统调用。如果线程过多,线程执行时间可能比线程执行时间长,使系统处于几乎无法使用的状态;2.修改版:异步+NIO实现的高性能网络通信这是Java中的NIO模型,如下图:你可能无法同时接受这么多陌生的概念(Acceptor,Selector,Channel),没关系,NIO会慢慢学习,这里我们只需要抓住它的核心:用一个队列将requestreceiver和workerthread分开,让requestreceiver和workerthread各尽所能,让fullIO和CPU资源的使用;现在,NIO连接器可以维持的并发连接数不再受工作线程数的限制,不需要分配大量的线程就可以支持大量的并发连接。3.进阶版,如何充分利用CPU和网络IO资源第二步,我们已经解决了高并发连接的问题,但是还远远不够。在一个采集系统中,我们需要做这三件事,解码、清洗和转换、发送。其中解码和清洗转换过程是纯CPU计算,占用CPU资源,而发送会占用大量IO资源。如果让一个线程依次执行这三件事,前两件事,CPU会很快完成,最后必然要等待IO操作。这个线程会被操作系统挂起,在那里等待,直到IO操作完成。如何解决只能增加工作线程的数量,但是增加工作线程的数量会导致过度的线程调度和上下文切换,这是CPU浪费的另一种形式。如何解决,我们可以使用异步的方式。什么是异步,比如煮饭的过程是异步的,先把米放到电饭锅里煮,趁着这个时间煮,这就是“异步”。如果等到饭熟了再做饭,这就是“同步”。也许你已经知道上次我们讲了CompletableFuture,它是一个异步编程框架,可以安排不同的线程。并且可以指定一个线程池,这样不同的线程池就可以做不同的事情。看下面代码://解码线程池ExecutorDecoderExecutor=ExecutorHelper.createExecutor(2,"decoder");//转换线程池ExecutorectExecutor=ExecutorHelper.createExecutor(8,"ect");//发送线程池ExecutorsenderExecutor=ExecutorHelper.createExecutor(2,"sender");@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,HttpRequestreq)throwsException{CompletableFuture//解码过程.supplyAsync(()->this.decode(ctx,req),this.decoderExecutor)//转换过程.thenApplyAsync(e->this.doExtractCleanTransform(ctx,req,e),this.ectExecutor)//IOprocess.thenApplyAsync(e->this.send(ctx,req,e),this.senderExecutor);}其中channelRead0为Nettyframework请求方法,每次请求都会走这个方法。每次有请求进来,这三个action都是异步执行的,但是CompletableFuture框架会保证每个请求三个action的执行顺序。这样就可以充分利用CPU资源和IO资源。三、进阶版存在的问题1、问题描述上面的异步模型可以用下图来表示。在上面的异步编程代码中,我们将不同类型的任务提交到不同的线程池中,而线程池是需要队列的,图中的队列就是线程池的队列。其中解码过程和转换过程是比较快的过程,而发送I/O过程是比较慢的。那么之前的消息会一直积压在发送进程的线程池队列中,等待执行。如果队列是无界队列,就会积压越来越多的任务,最终所有虚拟机的内存全部用完,导致OOM。2、如何控制事件的速度我们可以直接想到严格控制上游的发送速度,比如控制上游每秒只能发送1000条消息。这种方法有效,但非常不优雅。如果下游遇到特殊原因,每秒只能处理500条,还是会OOM,我们无法估计一个合适的值。3.反压有一个更优雅的方案叫做反压。我们上面方案的问题主要是因为队列是无界的,消息总是积压的,而且是非阻塞的。实现反向压力,只需要从两个方面来控制:执行器的任务队列,它的容量要有限制;当执行者的任务队列满时,会阻止上游继续提交任务,直到任务队列中有新空间为止。如上图所示,可以看出如果发送线程池队列满了,会阻止上游转换任务继续提交任务。过一段时间,转换进程的队列也会满了,也会阻止解码进程提交任务。对于我们的数据处理场景,我们可以通过横向增加服务器来解决TPS低的问题;如果是流式处理场景,那么最上游应该主动从kafka拉取消息。这时候,它就会放慢自己拉取消息的速度,从而达到流量控制的目的。一段时间后,发送线程池队列空闲,继续处理消息。4.如何实现背压?其实很简单。当队列满了,就会进入线程池的拒绝策略。在拒绝策略中,我们使用while循环重复提交任务,直到任务提交成功。见下面的代码:privatefinalList
