当前位置: 首页 > 后端技术 > Python

Python的bsonrpc源码解读

时间:2023-03-26 16:08:19 Python

bsonrpc是python中基于json或bson的远程过程调用库,提供服务端和客户端实现,其底层使用基于TCP连接的通信。程序结构bsonrpc主要包括以下文件:concurrent.py:为两种并发方式(threading线程对象、gevent协程对象)涉及的相应组件(Queue、Event、Lock等)提供统一的对外生成接口:spawn()、new_promise()、new_queue()、new_lock()等;definitions.py:定义rpc消息结构和错误编码;dispatcher.py:rpc处理调度,路由处理(消息对应处理函数);exceptions.py:异常定义;framing.py:定义不同的类来实现JSONRPC2.0标准中不同的消息结构;interfaces.py:定义提供服务的装饰器;misc.py:在这个文件中定义一个id从1开始累加的生成器。options.py:定义配置选项。rpc.py:主要是BSONRpc和JSONRpc的实现;socket_queue.py:主要是消息的拆包和打包;util.py:系统工具。本文主要描述库包中不同协议分包分组的处理,涉及socket_queue.py和framing.py文件,主要使用对象组合技术。解释socket_queue.py中的SocketQueue类用于处理从套接字接收到的数据。主要方法是_receiver()和put()方法,分别对应分包和组包。分包主要内容如下:def_receiver(self):bbuffer=b''whileTrue:try:chunk=self.socket.recv(self.BUFSIZE)#从socket接收数据bbuffer=self._to_queue(bbuffer+chunk)#除了DecodingErrorase:self._queue.put(e)#稍后省略...def_to_queue(self,bbuffer):b_msg,bbuffer=self.codec.extract_message(bbuffer)#解码器提取完整消息whileb_msgisnotNone:self._queue.put(self.codec.loads(b_msg))#解码后的消息放入消息队列进行处理b_msg,bbuffer=self.codec.extract_message(bbuffer)return的主要内容bbuffer包如下:defput(self,item):ifself._closed:raiseBsonRpcError('Attempttoputitemstoclosedqueue.')msg??_bytes=self.codec.into_frame(self.codec.dumps(item))#packagewithself._lock:self.socket.sendall(msg_bytes)如上图所示,程序使用对象组合实现消息包处理。对象组合是继承之外的另一种选择。对象组合要求被组合的对象具有定义良好的接口,通过接口调用其他对象的功能。这也是“黑盒复用”,因为对象的内部细节是不可见的。SocketQueue依赖于Codec的extract_message()接口方法,并不关心它的具体实现方法。具体实现由JSONCodec和BSONCode实现。JSONCodec依赖于JSONFrame中的extract_message()接口方法,由JSONFramingNone、JSONFramingNetstring、JSONFramingRFC7464实现。SocketQueue消息分组过程依赖于into_frame()方法,也是通过对象组合实现的。注:图中的接口是为了方便理解而添加的,源码中没有。