当前位置: 首页 > 后端技术 > Node.js

socket.iowithpm2(cluster)集群解决方案

时间:2023-04-03 21:06:52 Node.js

可以收藏我的博客socket.io和集群在线系统,需要用到node的多进程模型,我们可以实现一个简单的基于集群模式的socket分布模型,你也可以使用比较稳定的进程管理工具,比如pm2。在常规的http服务中,这种模式是正常的,但是一旦在服务器中集成了socket.io服务,ws通道的建立就会失败。即使通过备份的轮询方式,仍然会出现连接断断续续的情况,所以我们需要解决这个问题,让socket.io充分发挥多核的优势。这里之所以提到socket.io而没有提到websocket服务,是因为socket.io是在封装websocket的基础上保证可用性的。在客户端不提供websocket功能的基础上,使用xhr轮询、jsonp或者foreveriframe进行兼容。同时,在建立ws连接之前,往往要经过几轮http训练才能保证ws服务可用,所以socket.io不等于websocket。深入底层,socket.io实际上并没有做真正的websocket兼容,而是提供了上层接口和命名空间服务,真正的逻辑在“engine.io”模块中。该模块对握手、连接升级、心跳、传输方式等实现了http代理。因此,只有研究engine.io模块,才能清楚地了解socket.io的实现机制。场景再现服务器采用express+socket.io的组合方案,配合pm2的集群模式,实现一个简单的b/s通信demo:app.jsvarpath=require('path');varapp=require('express')(),server=require('http').createServer(app),io=require('socket.io')(server);io.on('connection',function(socket){socket.on('disconnect',function(){console.log('/:disconnect-------->')});socket.on('b:message',function(){socket.emit('s:message','/:'+port);console.log('/:'+port)});});io.of('/ws').on('connection',function(socket){socket.on('disconnect',function(){console.log('/ws:disconnect-------->')});socket.on('b:message',function(){socket.emit('/ws:message',port);});});app.get('/page',function(req,res){res.sendFile(path.join(process.cwd(),'./index.html'));});server.listen(8080);index.htmlpm2.json{"apps":[{"name":"ws","script":"./app.js","env":{"NODE_ENV":"development"},"env_production“:{“NODE_ENV”:“生产”},“实例”:4,“exec_mode”:“集群”,“max_restarts”:3,“restart_delay”:5000,“log_date_format”:“YYYY-MM-DDHH:mmZ","combine_logs":true}]}这样执行命令pm2startpm2.json启动服务访问127.0.0.1:8080/page,点击按钮发起ws连接,观察一下安慰下图清楚的展示了socket.io握手的错误:可以看到在websocket连接建立之前多了3次xhr请求,在websocket连接建立之后又多了几次xhr请求失败,最后的两个xhr请求失败。Socket.io并没有采用直接建立websocket连接这种粗暴的方式,而是先通过http请求(xhr)访问服务器的相关轮转配置信息和sid。这里的sid类似于sessionID,但是它唯一标识了连接,可以理解为socketId。以后每一个http请求cookie都必须携带sid(httponly);第二个和第三个请求用于确认连接。在socket.io中,post请求是客户端向服务端发送消息的唯一方式,post响应必须为“ok”,其“content-length”必须为2;而get请求主要用于轮训,同时获取服务器的相关信息,这在下面的文章中会有体现;第四次websocket连接请求失败,主要是与后端http握手失败造成的;第五个请求是一个xhr形式的post请求,作为websocket通道建立失败后的兼容性处理,上面提到socket.io的post请求只有在客户端需要发送时才会使用给服务器的消息。因此,为了确认,我们查看消息体:可以看出携带了客户端发送的消息类型b:message,还包含消息体{}空对象。相应的,服务端返回“OK”;第六个请求是xhr方式的get请求,用于获取服务器对第五个请求的响应。至此,我们大致分析了socket.io建立连接的大致流程,以及连接失败后如何获取底线。接下来分析握手失败的原因。原因是示例中pm2主进程启动了4个工作进程,主进程监听8080端口,向工作进程分发请求。pm2进程在分发请求阶段使用了一定的算法平衡,比如round-robin或者其他hash方法(但不是iphash),所以在socket.io客户端连接建立阶段发送的多个xhr请求会被pm2定位到不同的工作进程。上面提到,每个xhr请求都会携带sid字段来标识当前连接,所以当一个携带sid字段的请求被pm2定位到另一个与该连接无关的worker时,请求会失败并返回{"code":1,"message":"SessionIDunknown"}错误;即使前3次xhr握手成功,进入websocket连接升级阶段,负责监听更新事件的worker往往不是之前的worder,从而导致websocket连接无法建立。一句话,ws连接无法成功建立是因为client多次请求的server进程不是同一个进程。那我们怎么解决呢?最简单的解决方案是确保来自客户端的每个请求都可以位于同一个服务进程。当然,分布式会话也可以解决问题。依靠类似redis的第三方缓存,配合一致性哈希算法,保证所有服务进程都能获取到连接信息,相互配合完成连接建立。但这只是笔者理论上分析的一种实现方式,并没有通过测试,因为这种分布式架构不仅实现复杂,而且引入了对redis的相关依赖,这是不可取的。那么下面主要实现一个解决方案,保证客户端的每个请求都可以定位到同一个服务进程。各种实现官方实现官方提供了一个比较轻量级的架构:nginx反向代理+iphash我们示例demo中的http服务器只监听8080端口,所以请求必须通过pm2分发,否则会出现端口占用错误。不过官方的解决方案是每个进程的socket.io服务器创建不同端口的http服务器,专注于http握手和升级,nginx作为握手请求的代理。而且nginx必须设置iphash,保证同一个client的多个请求在后端都位于同一个服务进程。这样示例demo中会占用5个端口,其中8080??端口被公共http服务器使用,其他4个端口只用于ws连接握手。但是如何选择这四个端口呢?为了保证扩展性和顺序性,采用了兼容pm2的方案。PM2会给每个worker进程分配一个id,并将id绑定到进程的环境变量上,那么我们就可以通过workerid生成4个不同的端口号。app.jsvarpath=require('path');varapp=require('express')(),server=require('http').createServer(app),端口=3131+parseInt(process.env.NODE_APP_INSTANCE),io=require('socket.io')(port);io.on('connection',function(socket){socket.on('disconnect',function(){console.log('/:disconnect-------->')});socket.on('b:message',function(){socket.emit('s:message','/:'+port);console.log('/:'+port)});});io.of('/ws').on('connection',function(socket){socket.on('disconnect',function(){console.log('disconnect------>')});socket.on('b:message',function(){socket.emit('s:message',port);});});app.get('/abc',function(req,res){res.sendFile(path.join(process.cwd(),'./index.html'));});server.listen(8080);index.html<脚本>varbtn=document.getElementById('btn1');btn.addEventListener('点击',function(){varsocket=io.connect('http://ws.vd.net/ws',{reconnection:false});socket.on('connect',function(){//发起“脚手架安装”请求socket.emit('b:message',{a:1});socket.on('s:message',function(d){console.log(d);});});socket.on('error',function(err){console.log(err);})});nginx.conf上游io_nodes{ip_hash;服务器127.0.0.1:3131;服务器127.0.0.1:3132;服务器127.0.0.1:3133;服务器127.0.0.1:3134;}服务器{听80;服务器名称ws.vd.net;location/{proxy_set_header升级$http_upgrade;proxy_set_header连接“升级”;proxy_set_headerX-Forwarded-For$proxy_add_x_forwarded_for;proxy_set_header主机$host;proxy_http_version1.1;代理通行证http://io_nodes;}}在本机绑定hosts地址后启动nginx服务,同时启动server。点击按钮,成功建立ws连接。在页面上渲染选择的workerip和端口,然后浏览器的所有ws连接默认连接到ip:port对应的服务器。”这样,服务器渲染的任何页面都可以在这个实现如果页面是前端异步渲染的,仍然可以使用这种方式,但是先通过xhr请求从服务器获取需要握手的http服务器的ip和端口,然后进行ws连接.服务端路由的前提还是给每个ws服务器分配一个端口,但是去掉nginx,让服务端做iphash,采用服务端路由架构,实现简单,兼容性好。大神processrouting这里的god进程是主进程,类似于pm2进程,god进程路由是在god进程级别对请求进行定向分发,保证请求主机和进程的一致性,在god进程中,hash是为每个请求的ip,并为每个ws服务器创建一个单独的http服务器,用于握手升级。简单代码:varexpress=require('express'),cluster=require('cluster'),net=require('net'),sio=require('socket.io');varport=3000,num_processes=require('os').cpus().length;if(cluster.isMaster){varworkers=[];varspawn=function(i){workers[i]=cluster.fork();workers[i].on('exit',function(code,signal){console.log('respawningworker',i);spawn(i);});};for(vari=0;i