当前位置: 首页 > 科技观察

Websocket库Ws原理分析

时间:2023-03-17 20:22:59 科技观察

前言:本文基于nodejs的ws模块分析websocket的原理。ws服务端逻辑由websocket-server.js的WebSocketServer类实现。该类初始化一些参数并执行以下代码if(this._server){//将以下事件注册到服务器并返回一个注销函数(用于取消下面已注册的事件)this._removeListeners=addListeners(this._server,{//监听成功回调listening:this.emit.bind(this,'listening'),error:this.emit.bind(this,'error'),//收到协议升级请求回调upgrade:(req,socket,head)=>{this.handleUpgrade(req,socket,head,(ws)=>{//处理成功,触发链接成功事件this.emit('connection',ws,req);});}});我们看到ws监听了upgrade事件。当websocket请求到达时,它会执行handleUpgrade来处理升级请求。升级成功后,会触发连接事件。我们先看看handleUpgrade。handleUpgrade中逻辑不多,主要是对升级请求的一些httpheaders进行处理和校验。ws提供了一个验证挂钩。处理完httpheader后,会调用verifyClient验证升级请求是否被允许。如果成功则执行completeUpgrade。completeUpgrade顾名思义就是一个完成升级请求的函数,返回同意升级协议,设置一些http响应头。还有一些重要的逻辑处理。constws=newWebSocket(null);//设置管理socket数据ws.setSocket(socket,head,this.options.maxPayload);//cb为this.emit('connection',ws,req);cb(ws);我们看到这里创建了一个新的WebSocket对象,并调用了它的setSocket函数。让我们看看他做了什么。setSocket的逻辑比较多,我们慢慢分析。数据接收者classReceiverextendsWritable{}我们看到数据接收者是一个可写流。这意味着我们可以向其中写入数据。constreceiver=newReceiver();receiver.write('你好');我们来看看此时Receiver的逻辑。_write(chunk,encoding,cb){if(this._opcode===0x08&&this._state==GET_INFO)returncb();this._bufferedBytes+=chunk.length;this._buffers.push(chunk);this.startLoop(cb);}先记录当前数据的大小,然后保存数据,最后执行startLoop。startLoop(cb){leterr;this._loop=true;do{switch(this._state){//忽略其他情况GET_DATA:err=this.getData(cb);break;default://`INFLATING`this._loop=false;return;}}while(this._loop);cb(err);}我们知道websocket是基于tcp上层的应用层协议,所以我们在接收数据的时候,需要解析出每一个数据包(粘包问题),所以Receiver其实就是一个状态机。每次接收到数据,都会根据当前状态进行状态流转。例如,如果当前处于GET_DATA状态,则数据将被处理。接下来我们看看数据处理的逻辑。getData(cb){letdata=EMPTY_BUFFER;//提取数据部分if(this._payloadLength){data=this.consume(this._payloadLength);if(this._masked)unmask(data,this._mask);}//如果是控制消息,则执行controlMessageif(this._opcode>0x07)returnthis.controlMessage(data);//压缩后,先解压if(this._compressed){this._state=INFLATING;this.decompress(data,cb);return;}//不压缩直接处理(先保存到_fragments,再执行dataMessage)if(data.length){this._messageLength=this._totalPayloadLength;this._fragments.push(data);}returnthis.dataMessage();}我们实现了websocket协议来定义消息的类型,比如控制消息和数据消息。我们分别来看这两者的逻辑。controlMessage(data){//连接关闭if(this._opcode===0x08){this._loop=false;if(data.length===0){this.emit('conclude',1005,'');this.end();}}elseif(this._opcode===0x09){this.emit('ping',data);}else{this.emit('pong',data);}this._state=GET_INFO;}我们看到控制消息包括三种类型(conclude,ping,pong)。而数据消息只有this.emit('message',data);一种。这是接收器的整体逻辑。2数据发送端数据发送端是对websocket协议的封装。当用户调查数据发送端的发送接口发送数据时,数据发送端会组装成一个websocket协议包发送出去。发送(数据,选项,cb){constbuf=toBuffer(数据);constperMessageDeflate=this._extensions[PerMessageDeflate.extensionName];letopcode=options.binary?2:1;letrsv1=options.compress;if(this._firstFragment){this._firstFragment=false;if(rsv1&&perMessageDeflate){rsv1=buf.length>=perMessageDeflate._threshold;}this._compress=rsv1;}else{rsv1=false;opcode=0;}if(options.fin)this._firstFragment=true;//需要压缩if(perMessageDeflate){constops={fin:options.fin,rsv1,opcode,mask:options.mask,readOnly:toBuffer.readOnly};//压缩,排队等待,否则执行压缩if(this._deflating){this.enqueue([this.dispatch,buf,this._compress,opts,cb]);}else{this.dispatch(buf,this._compress,opts,cb);}}else{//无需压缩,直接发送this.sendFrame(Sender.frame(buf,{fin:options.fin,rsv1:false,opcode,mask:options.mask,readOnly:toBuffer.readOnly}),cb);}}send函数在处理了一些参数后发送数据,但是如果需要压缩,则必须在发送前进行压缩。数据处理完成后调用真正的发送函数sendFrame(list,cb){if(list.length===2){this._socket.cork();this._socket.write(list[0]);this._socket.write(list[1],cb);this._socket.uncork();}else{this._socket.write(list[0],cb);}}理解数据接收逻辑和sender,我们来看看websocket对象和setSocket函数是干什么的。websocket对象的本质是对TCPsocket的封装。它从底层接收数据,然后透传给数据接收方。数据接收端处理完成后,触发websocket对应的相应事件,比如message事件。发送数据时,websocket会调用数据发送方的接口,数据发送方将其组装成websocket协议的数据包发送出去。架构如下图所示。接下来看一下setSocket的逻辑setSocket(socket,head,maxPayload){//数据接收者,负责处理tcp上收到的数据(socket是tcp层的socket)constreceiver=newReceiver(...);//数据发送方负责向对端发送数据this._sender=newSender(socket,this._extensions);//数据接收方负责解析数据this._receiver=receiver;//net模块的tcpsocketthis._socket=socket;//关联receiver[kWebSocket]=this;socket[kWebSocket]=this;//监听receiver的事件,在解析数据时会回调receiver.on('conclude',receiverOnConclude);//下面两个事件是WritableTrigger写的receiver.on('drain',receiverOnDrain);receiver.on('error',receiverOnError);receiver.on('消息',receiverOnMessage);receiver.on('ping',receiverOnPing);receiver.on('pong',receiverOnPong);//清空定时器socket.setTimeout(0);//关闭nagle算法socket.setNoDelay();//升级请求中携带的httpbody通常为空if(head.length>0)socket.unshift(head);//监听tcp底层事件socket.on('close',socketOnClose);socket.on('data',socketOnData);socket.on('end',socketOnEnd);socket.on('error',socketOnError);this.readyState=WebSocket.OPEN;this.emit('open');}我们看到里面监听了各种事件。下面以数据事件为例,看看处理过程。当tcpsocket接收到数据时,就会执行socketOnData函数。functionsocketOnData(chunk){//receiver中的_write函数会被调用,其实就是receiver对象上的替换。如果数据解析错误,会触发socketerror事件if(!this[kWebSocket]._receiver.write(chunk)){this.pause();}}socketOnData通过receiver的接口将数据传给receiver,则receiver会解析数据,然后触发相应的事件,比如message。receiver.on('message',receiverOnMessage);functionreceiverOnMessage(data){this[kWebSocket].emit('message',data);}然后ws的socket对象继续向上层触发message事件。this[kWebSocket]的值是ws提供的socket对象本身。架构图如下。这就是ws实现websocket协议的基本原理。详情请参考源码。