今天老师给大家介绍一个特殊的RPC服务器模型。这种模型不同于Nginx、Redis、Apache、Tornado、Netty,它的原型是NodeCluster的多进程并发模型。Nginx并发模型我们知道Nginx的并发模型是多进程并发模型。绑定监听地址端口后,其Master进程fork出多个Slave进程,共同竞争处理服务器socket接收到的众多客户端连接。.这多个Slave进程在操作系统的内核态会共享同一个socket队列,操作系统的网络模块在处理完三次握手后会将socket插入到这个队列中。这是一个生产者-消费者模型。生产者是操作系统的网络模块,消费者是多个从进程,队列中的对象是客户端套接字。这种模式在负载均衡上有个缺点,就是socket分布不均,形成类似贫富两极分化的情况,即“闲的人多闲,忙的人多”的状态更忙”。这是因为当多个进程竞争同一个socket队列时,操作系统采用后进先出策略,最后接受的进程先得到socket。进程越忙,它调用accept的机会就越多,它能获得的套接字也就越多。节点集群并发模型节点集群采用不同的策略来解决负载均衡问题。也是多进程并发模型,Master进程会fork出多个子进程来处理clientsockets。但是不存在竞争问题,因为只有Master进程负责接受socket,Slave进程只负责处理客户端socket请求。那么就有一个问题,如何将Master进程获取到的clientsocket传递给Slave进程。这时候神奇的sendmsg就派上用场了。它是操作系统提供的系统调用,用于在不同进程之间传递文件描述符。sendmsg会拿一个特殊的“管道”将Master进程的socket描述符传递给Slave进程,Slave进程通过recvmsg系统调用从这个“管道”中取出描述符。这个“管道”比较特殊,它是一个Unix域套接字。普通套接字可以跨机器传递消息,而Unix域套接字只能在同一台机器上的不同进程之间传递消息。和管道一样,Unix域套接字也分为命名套接字和无名套接字。命名套接字在文件系统中指定一个路径名,无关进程可以通过这个路径字符访问Unix域套接字。父子进程之间一般使用无名套接字。父进程会通过socketpair调用创建socket,然后fork出子进程,这样子进程同时也会持有socket的引用。后续的父子进程就可以通过这个套接字进行通信。注意这里的传输描述符本质上不是传输,而是一个副本。sendmsg自动关闭时父进程的描述符不会自动消失,子进程接收到的描述符和父进程的描述符不是同一个整数值。但是父子进程的描述符会指向同一个内核套接字对象。有了传递描述符的能力,父进程就可以将接受的clientsocket依次传递给多个从进程,顺利达到负载均衡的目的。接下来,我们将使用Python代码来跑通NodeCluster的并发模型。因为直到Python3.5才内置了sendmsg和recvmsg方法,所以下面的代码需要使用Python3.5+才能运行。我们看sendmsg方法socket.sendmsg(buffers[,ancdata[,flags[,address]]])的定义我们只需要关心第二个参数ancdata,描述符是通过ancdata参数传递的,意思是“auxiliarydata”,buffers代表需要传递的消息内容。因为这里消息的内容是没有意义的,这个字段可以任意填写,但是必须要有内容。如果没有内容,则使用sendmsg方法进行空调。importsocket,structdefsend_fds(sock,fd):returnsock.sendmsg([b'x'],[(socket.SOL_SOCKET,socket.SCM_RIGHTS,struct.pack("i",fd))])#ancdata参数是三元的组列表,三元组的第一个参数表示网络协议栈的级别,第二个参数表示辅助数据的类型,第三个参数是携带的数据,level=SOL_SOCKET表示传输的数据在TCP中协议层,type=SCM_RIGHTS表示携带的数据是一个文件描述符。我们传递的描述符fd是一个整数,需要使用struct包序列化成二进制。看recvmsg方法的定义msg,ancdata,flags,addr=socket.recvmsg(bufsize[,ancbufsize[,flags]])同样,我们只需要关心返回的ancdata数据,里面包含我们需要的文件描述符.但是你需要提供消息体的长度和辅助数据的长度参数。辅助数据的长度比较特殊,需要使用CMSG_LEN方法计算,因为辅助数据中有额外的头部信息我们是看不到的。bufsize=1#消息内容长度ancbufsize=socket.CMSG_LEN(struct.calcsize('i'))#辅助数据长度msg,ancdata,flags,addr=socket.recvmsg(bufsize,ancbufsize)#接收消息级别,类型,fd_bytes=ancdata[0]#取第一个元组,注意在发送消息时,我们传递一个三元组列表fd=struct.unpack('i',fd_bytes)#反序列化代码实现让我来展示完整的服务器代码,为了简单起见,我们使用同步模型在Slave进程中处理RPC请求。#coding:utf#sendmsgrecvmsgpython3.5+只能支持importosimportjsonimportstructimportsocketdefhandle_conn(conn,addr,handlers):print(addr,"comes")whileTrue:#为了简单,这里没有循环读取length_prefix=conn.recv(4)ifnotlength_prefix:print(addr,"bye")conn.close()break#关闭连接继续处理下一个连接length,=struct.unpack("I",length_prefix)body=conn.recv(length)request=json.loads(body)in_=request['in']params=request['params']print(in_,params)handler=handlers[in_]handler(conn,params)defloop_slave(pr,handlers):whileTrue:bufsize=1ancsize=socket.CMSG_LEN(struct.calcsize('i'))msg,ancdata,flags,addr=pr.recvmsg(bufsize,ancsize)cmsg_level,cmsg_type,cmsg_data=ancdata[0]fd=struct.unpack('i',cmsg_data)[0]sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM,fileno=fd)handle_conn(sock,sock.getpeername(),handlers)defping(conn,params):send_result(conn,"pong",params)defsend_result(conn,out,result):response=json.dumps({"out":out,"result":result}).encode('utf-8')length_prefix=struct.pack("I",len(response))conn.sendall(length_prefix)conn.sendall(response)defloop_master(serv_sock,pws):idx=0whileTrue:sock,addr=serv_sock.accept()pw=pws[idx%len(pws)]#消息数据,whatevermsg=[b'x']#辅助数据,携带描述符ancdata=[(socket.SOL_SOCKET,socket.SCM_RIGHTS,struct.pack('i',sock.fileno())]pw.sendmsg(msg,ancdata)sock.close()#closereferenceidx+=1defprefork(serv_sock,n):pws=[]foriinrange(n):#openParent-子进程通信“管道”pr,pw=socket.socketpair()pid=os.fork()ifpid<0:#forkrorreturnpwsifpid>0:#父进程pr.close()#父进程不需要读pws.append(pw)continueifpid==0:#子进程serv_sock.close()#关闭引用pw.close()#子进程不用写returnprreturnpwsif__name__=='__main__':serv_sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)serv_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)serv_sock.bind(("localhost",8080))serv_sock.listen(1)pws_or_pr=prefork(serv_sock,10)ifhasattr(pws_or_pr,'__len__'):ifpws_or_pr:loop_master(serv_sock,pws_or_pr)else:#fork全部失败,没有子进程,GameOverserv_sock.close()else:handlers={"ping":ping}loop_slave(pws_or_pr,handlers)父进程使用fork调用创建多个子进程,然后使用socketpair调用为每个子进程创建一个未命名的socket用于传递描述符的父进程使用roundrobin策略平均分配接收到的clientsockets。子进程接收到一个描述符整数,需要将其包装到套接字对象中才能读取或写入。打印并比较发送和接收的描述符,你会发现它们的值是不一样的。这是因为sendmsg将描述符发送给内核后,内核会重新分配一个新的给描述符指向的内核套接字。描述符对象。
