当前位置: 首页 > 科技观察

UDPServer

时间:2023-03-12 19:55:33 科技观察

forNodejs源码分析转载请联系编程杂技公众号。让我们从一个使用示例开始,看看udp模块的实现。constdgram=require('dgram');//创建socket对象constserver=dgram.createSocket('udp4');//监听udp数据的到来server.on('message',(msg,rinfo)=>{//处理数据});//绑定端口server.bind(41234);我们看到创建udp服务器很简单,首先申请一个socket对象,在nodejs中和操作系统中是一样的,socket是一个网络通信的抽象,我们可以把它理解为传输层的一个抽象,它可以代表tcp或udp。我们来看看createSocket是干什么的。functioncreateSocket(type,listener){returnnewSocket(type,listener);}functionSocket(type,listener){EventEmitter.call(this);letlookup;letrecvBufferSize;letsendBufferSize;letoptions;if(type!==null&&typeoftype==='object'){options=type;type=options.type;lookup=options.lookup;recvBufferSize=options.recvBufferSize;sendBufferSize=options.sendBufferSize;}constandle=newHandle(type,lookup);this.type=type;if(typeoflistener==='function')this.on('message',listener);this[kStateSymbol]={handle,receiving:false,bindState:BIND_STATE_UNBOUND,connectState:CONNECT_STATE_DISCONNECTED,queue:undefined,reuseAddr:options&&options.reuseAddr,//UseUV_UDP_REUSEADDRiftrue.ipv6Only:options&&options.ipv6Only,recvBufferSize,sendBufferSize};}我们看到一个socket对象是对handle的一个封装。我们看handle是什么。functionnewHandle(type,lookup){//dns解析的函数,比如我们调用send时,传入一个域名if(lookup===undefined){if(dns===undefined){dns=require('dns');}lookup=dns.lookup;}if(type==='udp4'){consthandle=newUDP();handle.lookup=lookup4.bind(handle,lookup);returnhandle;}//忽略ipv6的processing}handle是对UDP模块的封装,UDP是一个c++模块,我们看一下c++模块的定义。//定义一个v8函数模块Localt=env->NewFunctionTemplate(New);//新创建的t对象需要额外扩充内存t->InstanceTemplate()->SetInternalFieldCount(1);//导出到js中层使用的名称LocaludpString=FIXED_ONE_BYTE_STRING(env->isolate(),"UDP");t->SetClassName(udpString);//属性访问属性enumPropertyAttributeattributes=static_cast(ReadOnly|DontDelete);Localsignature=Signature::New(env->isolate(),t);//新建一个函数模块Localget_fd_templ=FunctionTemplate::New(env->isolate(),UDPWrap::GetFD,env->as_callback_data(),signature);//设置访问器,访问fd属性时,执行get_fd_templ,从而执行UDPWrap::GetFDt->PrototypeTemplate()->SetAccessorProperty(env->fd_string(),get_fd_templ,Local(),attributes);//导出函数env->SetProtoMethod(t,"open",Open);//忽略一系列函数//使用target->Se导出到js层t(env->context(),udpString,t->GetFunction(env->context()).ToLocalChecked()).Check();在c++层的通用逻辑中,我们相关知识我已经讲过了,这里就不赘述了。当我们在js层新建UDP时,会新建一个c++对象UDPWrap::UDPWrap(Environment*env,Localobject):HandleWrap(env,object,reinterpret_cast(&handle_),AsyncWrap::PROVIDER_UDPWRAP){intr=uv_udp_init(env->event_loop(),&handle_);}执行uv_udp_init初始化udp对应的句柄。我们来看看libuv的定义。intuv_udp_init_ex(uv_loop_t*loop,uv_udp_t*handle,unsignedintflags){intdomain;interr;intfd;/*Usethelower8bitsforthedomain*/domain=flags&0xFF;//申请一个socket,返回一个fdfd=uv__socket(domain,SOCK_DGRAM,0);uv(loop_init,(uv_handle_t*)handle,UV_UDP);handle->alloc_cb=NULL;handle->recv_cb=NULL;handle->send_queue_size=0;handle->send_queue_count=0;//初始化io观察者(尚未注册到eventcyclepollio阶段),监听的文件描述符为fd,回调为uv__udp_iouv__io_init(&handle->io_watcher,uv__udp_io,fd);//初始化写队列QUEUE_INIT(&handle->write_queue);QUEUE_INIT(&handle->write_completed_queue);return0;}这里是我们在js层执行dgram.createSocket('udp4')时在nodejs中的主要执行流程。回到最初的例子,我们来看一下执行bind时的逻辑。Socket.prototype.bind=function(port_,address_/*,callback*/){letport=port_;//套接字状态conststate=this[kStateSymbol];//if(state.bindState!==BIND_STATE_UNBOUND)thrownewERR_SOCKET_ALREADY_BOUND();//否则标记已经绑定state.bindState=BIND_STATE_BINDING;//如果不传地址,默认绑定所有地址if(!address){if(this.type==='udp4')address='0.0.0.0';elseaddress='::';}//解析后绑定dns,必要时绑定state.handle.lookup(address,(err,ip)=>{if(err){state.bindState=BIND_STATE_UNBOUND;this.emit('error',err);return;}consterr=state.handle.bind(ip,port||0,flags);if(err){constex=exceptionWithHostPort(err,'bind',ip,port);state.bindState=BIND_STATE_UNBOUND;this.emit('error',ex);//Todo:close?return;}startListening(this);returnthis;}bind函数的主要逻辑是handle.bind和startListening。下面我们一一来看,再来看c++层的bind。voidUDPWrap::DoBind(constFunctionCallbackInfo&args,intfamily){UDPWrap*wrap;ASSIGN_OR_RETURN_UNWRAP(&wrap,args.Holder(),args.GetReturnValue().Set(UV_EBADF));//绑定(ip,port,flags)CHECK_EQ(args.Length(),3);node::Utf8Valueaddress(args.GetIsolate(),args[0]);Localctx=args.GetIsolate()->GetCurrentContext();uint32_tport,flags;if(!args[1]->Uint32Value(ctx).To(&port)||!args[2]->Uint32Value(ctx).To(&flags))return;structsockaddr_storageaddr_storage;interr=sockaddr_for_family(family,address.out(),port,&addr_storage);if(err==0){err=uv_udp_bind(&wrap->handle_,reinterpret_cast(&addr_storage),flags);}args.GetReturnValue().Set(err);}也逻辑不多,处理参数然后执行uv_udp_bind,uv_udp_bind不具体,类似tcp,设置一些flags和attributes,然后执行操作系统bind的函数,将本地ip和port保存到socket中。我们继续看startListening。functionstartListening(socket){conststate=socket[kStateSymbol];//有数据时回调,触发消息事件state.handle.onmessage=onMessage;//关键点,开始监听数据state.handle.recvStart();state.接收=真;state.bindState=BIND_STATE_BOUND;if(state.recvBufferSize)bufferSize(socket,state.recvBufferSize,RECV_BUFFER);if(state.sendBufferSize)bufferSize(socket,state.sendBufferSize,SEND_BUFFER);socket.emit('监听');}重点是我们用C++实现的recvStart函数。voidUDPWrap::RecvStart(constFunctionCallbackInfo&args){UDPWrap*wrap;ASSIGN_OR_RETURN_UNWRAP(&wrap,args.Holder(),args.GetReturnValue().Set(UV_EBADF));interr=uv_udp_recv_start(&wrapOn-vOn>Allochandle,_R,);//UV_EALREADY表示socketisalreadyboundbutthat'ssokayif(err==UV_EALREADY)err=0;args.GetReturnValue().Set(err);}OnAlloc,OnRecv分别是分配内存接收数据的函数和数据到达时执行的回调.继续libuvintuv__UDP_RECV_START(UV_UDP_T*handle,uv_alloc_cballoc_cb,uv_udp_recv_cbrecv_cb){Intrive;ext;ever;err=uv__udp_maybe_maybe_maybe_deferred_bindrect(rect_bindreot_rect,af_inet,0);//将io观察者注册到循环中,如果事件到来,等到pollio阶段处理将io观察者注册到循环中,等待事件到来时,在pollio阶段进行处理。前面我们说过,回调函数是uv__udp_io。让我们看看当事件被触发时函数是如何处理的。staticvoiduv__udp_io(uv_loop_t*loop,uv__io_t*w,unsignedintrevents){uv_udp_t*handle;handle=container_of(w,uv_udp_t,io_watcher);//可读事件触发if(revents&POLLIN)uv__udp_recvmsg(handle);//可写事件触发if(revents&POLLOUT){uv__udp_sendmsg(handle);uv__udp_run_completed(handle);}}这里先分析一下可读事件的逻辑。我们看uv__udp_recvmsg。staticvoiduv__udp_recvmsg(uv_udp_t*handle){structsockaddr_storagepeer;structmsghdrh;ssize_tnread;uv_buf_tbu??f;intflags;intcount;count=32;do{//分配内存接收数据,buf=uv_buf_init(NULL,0);handle->alloc_cb((uv_handle_t*)handle,64*1024,&buf);memset(&h,0,sizeof(h));memset(&peer,0,sizeof(peer));h.msg_name=&peer;h.msg_namelen=sizeof(peer);h.msg_iov=(void*)&buf;h.msg_iovlen=1;//调用操作系统读取数据的函数do{nread=recvmsg(handle->io_watcher.fd,&h,0);}while(nread==-1&&errno==EINTR);//调用c++层回调handle->recv_cb(handle,nread,&buf,(conststructsockaddr*)&peer,flags);}}libuv会回调c++层,然后c++layer会回调到js层,最后触发message事件,也就是代码开头对应的message事件。