redis-py概述,python用来连接redis数据库的三方库。最近在研究它的源码和用法:importredis#下面两步只是实例化和创建连接池,没有实际连接conn_pool=redis.ConnectionPool(host='1.2.2.2',port=63,password='xxx',db=0)conn=redis.StrictRedis(connection_pool=conn_pool)#只有第一次执行命令时才会连接conn.ping()我们一步一步来理解,在执行ping命令时,client.py模块会执行StrictRedis类的方法self.execute_command#COMMANDEXECUTIONANDPROTOCOLPARSINGdefexecute_command(self,*args,**options):"Executeacommandandreturnaparsedresponse"pool=self.connection_poolcommand_name=args[0]#从连接池获取连接connection=pool.get_connection(command_name,**options)try:#开始连接到redis服务在这里connection.send_command(*args)#返回解析结果returnself.parse_response(connection,command_name,**options)except(ConnectionError,TimeoutError)ase:connection.disconnect()ifnotconnection.retry_on_timeoutandisinstance(e,超时tError):raiseconnection.send_command(*args)returnself.parse_response(connection,command_name,**options)finally:pool.release(connection)send_command会调下面的方法defsend_packed_command(self,command):"Sendanalready打包命令到Redis服务器"ifnotself._sock:#开始连接self.connect()try:ifisinstance(command,str):command=[command]foritemincommand:self._sock.sendall(item)exceptsocket.timeout:self.disconnect()raiseTimeoutError("Timeoutwritingtosocket")exceptsocket.error:e=sys.exc_info()[1]self.disconnect()iflen(e.args)==1:errno,errmsg='UNKNOWN',e.args[0]else:errno=e.args[0]errmsg=e.args[1]raiseConnectionError("Error%swhilewritingtosocket.%s."%(错误号,错误消息))except:self.disconnect()raise它连接到服务器的函数是这个_connect函数def_connect(self****):"""Createatcpsocketconnection"""#wewanttomimicwha****tsocket.create_connection确实支持#ipv4/ipv6,但我们想在调用之前设置选项#socket.connect()err=Noneforresinsocket.getaddrinfo(self.host,self.port,0,socket.SOCK_STREAM):family,socktype,proto,canonname,socket_address=ressock=Nonetry:sock=socket.socket(family,socktype,proto)#TCP_NODELAYsock.setsockopt(socket.IPPROTO_TCP,socket.TCP_NODELAY,1)#TCP长连接ifself.socket_keepalive:sock.setsockopt(socket.SOL_SOCKET,socket.SO_KEEPALIVE,1)fork,viniteritems(self.socket_keepalive_options):sock.setsockopt(socket.SOL_TCP,k,v)#在连接之前设置socket_connect_timeout超时sock.settimeout(self.socket_connect_timeout)#连接sock.connect(socket_address)#连接成功后设置socket_timeout时间sock.settimeout(self.socket_timeout)returnsockexceptsocket.erroras_:err=_ifsockisnotNone:sock.close()iferrisnotNone:raiseerrraisesocket.error("socket.getaddrinforeturnedanemptylist")on_connect函数,序列化连接,如果有密码,发送密码,选择数据表defon_connect(self):"Initializeconnection,authenticateandselectadatabase"self._parser.on_connect(self)#如果指定了密码,则进行身份验证ifself.password:#如果redis服务有进一步的安全加固,这里也是为了authenticateself.send_command('AUTH',self.password)ifnativestr(self.read_response())!='OK':raiseAuthenticationError('InvalidPassword')#如果指定了数据库,则切换到它ifself.db:self.send_command('SELECT',self.db)ifnativestr(self.read_response())!='OK':raiseConnectionError('InvalidDatabase')它的连接池使用一个列表来做defreset(self):self.pid=os.getpid()#创建的连接数self._created_connections=0#存储可用连接self._available_connections=[]#存储重复使用的连接self._in_use_connections=set()#线程锁self._check_lock=threading.Lock()获取连接defget_connection(self,command_name,*keys,**options):"Getaconnectionfromthepool"self._checkpid()try:#从列表中获取连接connection=self._available_connections.pop()exceptIndexError:#doesnotexist创建一个connectionconnection=self.make_connection()#并添加到可用的连接集合self._in_use_connections.add(connection)returnconnectioncreateanewconnectiondefmake_connection(self):"Createanewconnection"ifself._created_connections>=self.最大connections:raiseConnectionError("Toomanyconnections")self._created_connections+=1返回self.connection_class(**self.connection_kwargs)
