当前位置: 首页 > 科技观察

调查事情才知道——记录一次Nodejs源码分析心得

时间:2023-03-21 13:54:48 科技观察

本文转载自微信公众号《编程杂技》,作者theanarkh。转载本文请联系编程杂技公众号。昨天在分析http模块相关的代码时,遇到了一个晦涩难懂的逻辑。想了想还是没明白。百度和谷歌搜索了很多帖子都没有找到合适的答案。突然,我看到了一个搜索结果,主题有点眼熟。我在StackOverflow上点了一篇帖子,结果已经是404了,终于通过快照功能成功看到了内容。这篇帖子[1]与我的疑惑无关,却突然给了我一些启发。顺着这个灵感看代码,最后下载了nodejs源码,加了点日志,连夜编译(时间太长,等不及编译完成,只好睡觉了)。早上起床求证,终于解开了疑点。问题源于以下一段代码。functionconnectionListenerInternal(server,socket){socket.server=server;//分配一个http解析器constparser=parsers.alloc();//解析请求消息parser.initialize(HTTPParser.REQUEST,newHTTPServerAsyncResource('HTTPINCOMINGMESSAGE',socket),server.maxHeaderSize||0,server.insecureHTTPParser===undefined?isLenient():server.insecureHTTPParser,);parser.socket=socket;//开始解析header开始时间parser.parsingHeadersStart=nowDate();socket.parser=parser;conststate={onData:null,onEnd:null,onClose:null,onDrain:null,//在同一个tcp连接上,请求和响应队列outgoing:[],incoming:[],outgoingData:0,keepAliveTimeoutSet:false};state.onData=socketOnData.bind(undefined,server,socket,parser,state);socket.on('data',state.onData);if(socket._handle&&socket._handle.isStreamBase&&!socket._handle._consumed){parser._consumed=true;socket._handle._consumed=true;parser.consume(socket._handle);}parser[kOnExecute]=onParserExecute.bind(undefined,server,socket,parser,state);socket._paused=false;}这段代码看起来很多,这是启动http服务器后,有一个新的tcp连接建立连接时执行的回调就是如何处理tcp上数据的到来。上面代码中,nodejs监听了socket的data事件,同时注册了hookkOnExecute。我们都知道数据事件是数据到达流时触发的事件。让我们看看socketOnData做了什么。functionssocketOnData(server,socket,parser,state,d){//交给http解析器处理,返回已解析的字节数constret=parser.execute(d);onParserExecuteCommon(server,socket,parser,state,ret,d);}这个好像没什么问题,socket上有数据,然后交给http解析器处理。http模块源码分析的文章几乎都是这样分析的。我的第一反应是这没问题。kOnExecute是做什么的?kOnExecute钩子函数的值是onParserExecute,好像是用来解析tcp上的数据的。好像和onSocketData功能一样。tcp上的数据有两个消费者吗?让我们看看kOnExecute什么时候被回调。voidOnStreamRead(ssize_tnread,constuv_buf_t&buf)override{Localret=Execute(buf.base,nread);Localcb=object()->Get(env()->context(),kOnExecute).ToLocalChecked();MakeCallback(cb.As(),1,&ret);}是在node_http_parser.cc中的OnStreamRead中回调的,那么OnStreamRead是什么时候回调的呢?OnStreamRead是nodejs中c++层流操作的一个泛型函数,当stream有数据时执行回调。而OnStreamRead也会将数据交给http解析器进行解析。好像真的有两个消费者?这就很奇怪了,为什么一个数据两次交给http解析器处理呢?这时我的想法是,这两个地方一定是互不相容的。但是我一直没能找到它在哪里完成的。终于在connectionListenerInternal的一段代码中找到了答案。if(socket._handle&&socket._handle.isStreamBase&&!socket._handle._consumed){parser._consumed=true;socket._handle._consumed=true;parser.consume(socket._handle);}因为tcp流继承了StreamBase类,所以if成立(后面会详细分析)。我们来看看consume的实现。staticvoidConsume(constFunctionCallbackInfo&args){Parser*parser;ASSIGN_OR_RETURN_UNWRAP(&parser,args.Holder());CHECK(args[0]->IsObject());StreamBase*stream=StreamBase::FromObject(args[0].As());CHECK_NOT_NULL(stream);stream->PushStreamListener(parser);}http解析器将自己注册为tcp流的监听器。这就涉及到c++层对流的设计。我们从头开始。看看PushStreamListener做了什么。在c++层,流的操作被类StreamResource封装。classStreamResource{public:virtual~StreamResource();virtualintReadStart()=0;virtualintReadStop()=0;virtualintDoShutdown(ShutdownWrap*req_wrap)=0;virtualintDoTryWrite(uv_buf_t**bufs,size_t*count);virtualintDoWrite(WriteWrap*wt,uv*bufs,size_tcount,uv_stream_t*send_handle)=0;voidPushStreamListener(StreamListener*listener);voidRemoveStreamListener(StreamListener*listener);protected:uv_buf_tEmitAlloc(size_tsuggested_size);voidEmitRead(ssize_tnread,constuv_buf_t&buf=uv_buf_init(nullptr,0));StreamListener*listener=nullptr;uint64_tbytes_read_=0;uint64_tbytes_written_=0;friendclassStreamListener;};我们看到StreamResource是一个基类,它定义了用于操作流的公共方法。其中一个成员是StreamListener类的实例。我们看一下StreamListener的实现。classStreamListener{public:virtual~StreamListener();virtualuv_buf_tOnStreamAlloc(size_tsuggested_size)=0;virtualvoidOnStreamRead(ssize_tnread,constuv_buf_t&buf)=0;virtualvoidOnStreamDestroy(){}inlineStreamResource*stream(){returnsstream_;}protected:voidPassReadErrorStreamToPreviousListener(ssizeResourcetnread);=nullptr;StreamListener*previous_listener_=nullptr;friendclassStreamResource;};StreamListener是负责消费流数据的类。StreamListener和StreamResource类的关系如下。null我们看到一个流可以注册多个监听器,多个监听器组成一个链表。那么我们来看看如何创建c++层的tcp对象。下面是TCPWrap的继承关系。classTCPWrap:publicConnectionWrap{}classConnectionWrap:publicLibuvStreamWrap{}classLibuvStreamWrap:publicHandleWrap,publicStreamBase{}classStreamBase:publicStreamResource{}我们看到tcp流是继承自StreamResource。在创建一个新的tcpC++对象(tcp_wrap.cc)时,会不断调用父类的构造函数,在StreamBase中有一个关键的操作。inlineStreamBase::StreamBase(Environment*env):env_(env){PushStreamListener(&default_listener_);}EmitToJSStreamListenerdefault_listener_;StreamBase默认会为流注册一个监听器。下面看一下EmitToJSStreamListener的具体定义。classReportWritesToJSStreamListener:publicStreamListener{public:voidOnStreamAfterWrite(WriteWrap*w,intstatus)override;voidOnStreamAfterShutdown(ShutdownWrap*w,intstatus)override;private:voidOnStreamAfterReqFinished(StreamReq*req_wrap,intstatus);};classEmitToJSStreamListener:publicReportWritesToJSStreamListener{public:onuv_sizet_overridesize);voidOnStreamRead(ssize_tnread,constuv_buf_t&buf)override;};EmitToJSStreamListener继承自StreamListener,定义了分配内存和读取接收数据的函数。那我们就来看看PushStreamListener是干什么的。inlinevoidStreamResource::PushStreamListener(StreamListener*listener){//Header插入方法listener->previous_listener_=listener_;listener->stream_=this;listener_=listener;}PushStreamListener就是构造上图的结构体。对应创建c++层的tcp对象,如下图所示。然后我们看整个链路读取流的数据。首先js层调用readStartfunctiontryReadStart(socket){socket._handle.reading=true;consterr=socket._handle.readStart();if(err)socket.destroy(errnoException(err,'read'));}//注册等待读取事件Socket.prototype._read=function(n){tryReadStart(this);};让我们看看readStartintLibuvStreamWrap::ReadStart(){returnuv_read_start(stream(),[](uv_handle_t*handle,size_tsuggested_size,uv_buf_t*buf){static_cast(handle->data)->OnUvAlloc(suggested_size,buf);},[](uv_stream_t*stream,ssize_tnread,constuv_buf_t*buf){static_cast(stream->data)->OnUvRead(nread,buf);});}ReadStart调用libuv的uv_read_start注册等待可读事件,并注册两个回调函数OnUvAlloc和OnUvRead。voidLibuvStreamWrap::OnUvRead(ssize_tnread,constuv_buf_t*buf){EmitRead(nread,*buf);}inlinevoidStreamResource::EmitRead(ssize_tnread,constuv_buf_t&buf){//bytes_read_表示读取的字节数if(nread>0)bytes_read_+=static_cast(nread);listener_->OnStreamRead(nread,buf);}listener_的OnStreamRead会在最后逐层调用。我们来看看tcp的OnStreamReadvoidEmitToJSStreamListener::OnStreamRead(ssize_tnread,constuv_buf_t&buf_){StreamBase*stream=static_cast(stream_);Environment*env=stream->stream_env();HandleScopehandle_scope(env->isolate());Context::Scopecontext_scope(env->context());AllocatedBufferbuf(env,buf_);stream->CallJSOnreadMethod(nread,buf.ToArrayBuffer());}继续回调CallJSOnreadMethodMaybeLocalStreamBase::CallJSOnreadMethod(ssize_tnread,Localab,size_toffset,StreamBaseJSCheckschecks){Environment*env=env_;//...AsyncWrap*wrap=GetAsyncWrap();CHECK_NOT_NULL(wrap);Localonread=wrap->object()->GetInternalField(kOnReadFunctionField);CHECK(onread->IsFunction());returnwrap->MakeCallback(onread.As(),arraysize(argv),argv);}CallJSOnreadMethod会回调js层的onread回调函数。onread会将数据推入流中,然后触发数据事件。这是tcp中默认的数据读取过程。文章开头提到的parser.consume打破了这种默认行为。流->PushStreamListener(解析器);修改tcp流的listener链,http解析器将自己视为数据的接收者。所以此时tcp流上的数据直接被node_http_parser.cc的OnStreamRead消费了。而不是触发socket的data事件,最后在nodejs源码中添加日志,重新编译验证确实如文章所述。最后,这个过程还有一个关键点,调用consume函数的前提是socket._handle.isStreamBase为true。StreamBase::AddMethods中定义isStreamBase为true,在创建tcp对象时调用该方法,所以如果tcp的isStreamBase为true,就会执行consume,执行kOnExecute回调。References[1]帖子:http://cache.baiducontent.com/c?m=rZy2XovtTdJJuXWLM-s8wgpaz8NFubewtolyiC19iAKFJrbGdx2EFnArzlAIDisNP70zWWsCPv-4jwMHTGNcLaUsMVr-lvLqYmmHD-w_fUYz6a5K6OQRC9kZmLYN5RXsb34OdINb8xHIJsdyClaEWOtCGKMQ2saYK7ed7OG8v0E1pRKR4K46phl0rCBrw6amXE3QpPo62dMhvu_VASYYqq&p=cb77c64ad49111a05bee9e264d5693&newp=882a9646dc9712a05ab7cc374f0ccc231615d70e3ad3d501298ffe0cc4241a1a1a3aecbf2d29170ed6c27f630bae4856ecf630723d0834f1f689df08d2e??cce7e7b&s=cfcd208495d565ef&user=baidu&fm=sc&query=onParserExecute&qid=869f73bc002e44f5&p1=11