当前位置: 首页 > 后端技术 > Java

Elasticsearch源码分析HTTP请求响应处理

时间:2023-04-01 23:58:28 Java

因为ES有多个版本,主要是每个版本的启动过程不一样。不想单独分析某个版本如何启动,分析ES如何响应。HTTP请求,以及如何在幕后实现它们。下面就为大家简单分析一下,HTTP服务器实现。HTTPServerElasticsearchNetty注册服务器Netty4HttpServerTransportprotectedvoiddoStart(){booleansuccess=false;尝试{serverBootstrap=newServerBootstrap();serverBootstrap.group(newNioEventLoopGroup(workerCount,daemonThreadFactory(settings,HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));//NettyAllocator将返回设计用于与已配置分配器一起使用的通道类型serverBootstrap.channel(NettyAllocator.getServerChannelType());//为服务器通道和创建的子通道设置分配器serverBootstrap.option(ChannelOption.ALLOCATOR,NettyAllocator.getAllocator());serverBootstrap.childOption(ChannelOption.ALLOCATOR,NettyAllocator.getAllocator());serverBootstrap.childHandler(configureServerChannelHandler());serverBootstrap.handler(新的ServerChannelExceptionHandler(这个));serverBootstrap.childOption(ChannelOption.TCP_NODELAY,SETTING_HTTP_TCP_NO_DELAY.get(settings));serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE,SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));//节省部分代码finalByteSizeValuetcpSendBufferSize=SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);如果(tcpSendBufferSize.getBytes()>0){serverBootstrap.childOption(ChannelOption.SO_SNDBUF,Math.toIntExact(tcpSendBufferSize.getBytes()));}finalByteSizeValuetcpReceiveBufferSize=SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(设置);如果(tcpReceiveBufferSize.getBytes()>0){serverBootstrap.childOption(ChannelOption.SO_RCVBUF,Math.toIntExact(tcpReceiveBufferSize.getBytes()));}serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR,recvByteBufAllocator);serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,recvByteBufAllocator);finalbooleanreuseAddress=SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);serverBootstrap.option(ChannelOption.SO_REUSEADDR,reuseAddress);serverBootstrap.childOption(ChannelOption.SO_REUSEADDR,reuseAddress);=真;}finally{如果(成功==false){doStop();//否则我们会泄漏线程,因为我们从未移动到started}}}我用过Netty才知道上面代码是什么意思,设置工作线程,TCP设置,设置管道handlerNetty的连接,一般是在的实现类中添加的在childHandler()中设置的ChannelInitializer。看configureServerChannelHandler()初始化HttpChannelHandler,在initChannel()中可以看到添加的处理器。protectedvoidinitChannel(Channelch)抛出异常{Netty4HttpChannelnettyHttpChannel=newNetty4HttpChannel(ch);ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);ch.pipeline().addLast("read_timeout",newReadTimeoutHandler(transport.readTimeoutMillis,TimeUnit.MILLISECONDS));finalHttpRequestDecoder解码器=newHttpRequestDecoder(handlingSettings.getMaxInitialLineLength(),handlingSettings.getMaxHeaderSize(),handlingSettings.getMaxChunkSize());decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);ch.pipeline().addLast("解码器",解码器);ch.pipeline().addLast("decoder_compress",newHttpContentDecompressor());ch.pipeline().addLast("编码器",newHttpResponseEncoder());finalHttpObjectAggregator聚合器=newHttpObjectAggregator(handlingSettings.getMaxContentLength());聚合器.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);ch.pipeline().addLast("聚合器",聚合器);如果(handlingSettings.isCompression()){ch.pipeline().addLast("encoder_compress",newHttpContentCompressor(handlingSettings.getCompressionLevel()));}if(handlingSettings.isCorsEnabled()){ch.pipeline().addLast("cors",newNetty4CorsHandler(transport.corsConfig));}ch.pipeline().addLast("pipelining",newNetty4HttpPipeliningHandler(logger,transport.pipeliningMaxEvents));ch.pipeline().addLast("handler",requestHandler);transport.serverAcceptedChannel(nettyHttpChannel);}从上面的代码知道处理请求的是:requestHandler,它的实际类型:Netty4HttpRequestHandlerprotectedvoidchannelRead0(ChannelHandlerContextctx,HttpPipelinedRequestmsg){Netty4HttpChannelchannel=ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();}FullHttpRequest请求=msg.getRequest();布尔成功=假;Netty4HttpRequesthttpRequest=newNetty4HttpRequest(request,msg.getSequence());尝试{if(request.decoderResult().isFailure()){Throwablecause=request.decoderResult().cause();if(causeinstanceofError){ExceptionsHelper.maybeDieOnAnotherThread(cause);}serverTransport.incomingRequestError(httpRequest,channel,newException(cause));}else{serverTransport.incomingRequestError(httpRequest,channel,(Exception)原因);}}else{serverTransport.incomingRequest(httpRequest,channel);成功=真;}最后{if(success==false){httpRequest.release();}可以看出处理http请求的方法委托了Netty4HttpServerTransport,也就是上面的Nettyserver类。这里的逻辑是将httpRequest、channel转化为Elasticsearch模板对象,屏蔽底层api,然后从线程池中获取ThreadContext进行任务执行。类似于http分发服务器。详细代码就不展示了。一系列的方法调用,见下图。TransportAction.doExecute是一个抽象方法,由NodeClient.transportAction返回的实现类调用执行。每一个URL都会有一个对应的transportAction实现类,这和我们通常的MVC架构是不一样的。NodeClient内置了一个Mapactions,里面包含了所有HTTP请求的处理方式,有300多个值对应不同的场景。看下面最简单的响应,当我请求ES:9200端口时,返回基础信息,由TransportMainAction如何响应的publicclassTransportMainActionextendsHandledTransportAction{privatefinalStringnodeName;私有最终集群服务集群服务;@InjectpublicTransportMainAction(Settingssettings,TransportServicetransportService,ActionFiltersactionFilters,ClusterServiceclusterService){super(MainAction.NAME,transportService,actionFilters,MainRequest::new);this.nodeName=Node.NODE_NAME_SETTING.get(设置);this.clusterService=clusterService;}@OverrideprotectedvoiddoExecute(Tasktask,MainRequestrequest,ActionListenerlistener){ClusterStateclusterState=clusterService.state();listener.onResponse(newMainResponse(nodeName,Version.CURRENT,clusterState.getClusterName(),clusterState.metaData().clusterUUID(),构建.CURRENT));}}总结分析了这么多代码。ES处理HTTP请求链的执行过程虽然比较复杂,但实际代码还是比较简单的。整体分析就是分析TransportAction.doExecute是如何响应请求的