原因在使用nameko的时候,想看看nameko的连接复用原理(指amqp网络连接与rabbitmq的复用)。通常,连接复用有两种方案:TLS(ThreadLocalStorage))第一种连接池方案实现起来最简单,但也有局限性。例如,它仅在使用线程池时才有用。比如没有多路复用就不适合用协程或者线程。第二种方案,连接池是最常见的方案,但也是最麻烦的方案。那么nameko使用的是哪种方案呢?答案是:连接池好,既然我们已经知道了这个事实,那么在深入这个事实之前,我们先来了解一下kombu的连接池机制吧!nameko获取连接站点包/nameko/amqp/publish.pyimportwarningsfromcontextlibimportcontextmanagerfromkombuimportConnectionfromkombu.exceptionsimportChannelErrorfromkombu.poolsimportconnections,producersfromnameko.constantsimport(DEFAULT_RETRY_POLICY,DEFAULT_TRANSPORT_OPTIONS,PERSISTENT)@contextmanagerdefget_connection(amqp_uri,ssl=None,login_method=None,transport_options=None):如果不是transport_options:transport_options=DEFAULT_TRANSPORT_OPTIONS。copy()conn=Connection(amqp_uri,transport_options=transport_options,ssl=ssl,login_method=login_method)withconnblock=True)asconnection:yieldconnection当我看到conn=Connection()时,我想,这不会创建一个每次调用get_connection时的amqp连接?这就是结局?还要连接和重用?还是变鬼了?真相是什么?conn=Connection()实例化,直接创建连接对象,但不创建网络连接(不发起TCP连接请求,可以理解为这个连接是惰性的,只有真正用到的时候才会创建网络连接)嗯,这段代码说的很清楚,应该就是下面这句话:withconnections[conn].acquire(block=True)asconnection:yieldconnection是不是好像很难理解?这里先说明两点:kombuconnection的poolgroup每次调用get_connection都会创建一个conn对象,然后fromkombu.poolsimportconnectionsfromkombuimportConnectionuri='amqp://pon:pon@124.222.178.120:5672//'connection=Connection(uri)withconnections[connection].acquire(block=True)asconn:pass改为下面理解fromkombu.poolsimportconnectionsfromkombu.connectionimportConnectionPoolfromkombuimportConnectionuri='amqp://pon:pon@192.168.31.245:5672//'connection=Connection(uri)defget_connection_pool(connection:Connection)->ConnectionPool:"""connections是Connections的实例,Connections是PoolGroup的子类"""connection_pool:ConnectionPool=connections[connection]returnconnection_pooldefget_connection_from_pool(pool:ConnectionPool)->连接:returnpool.acquire(block=True)defget_connection_from_pool_group(connection:Connection)->Connection:returnget_connection_from_pool(get_connection_pool(connection))withget_connection_from_pool_group(connection)asconn:passeverytimeconnections[connection]会不会有什么问题?其实不是,因为connections的dict子类的__setitem__方法被重写了,key):h=eqhash(key)如果h不在self中:returnself.__missing__(key)(key),value)def__delitem__(self,key):returnsuper().__delitem__(eqhash(key))connections是Connections的实例,Connections是PoolGroup的子类,PoolGroup是EqualityDict的子类,当EqualityDict是dictconnections[connection]的子类,会执行EqualityDict的__setitem__方法。可以看到在调用dict的__setitem__方法时,会调用eqhash获取连接的哈希值。什么是eqhash?site-packages/kombu/utils/collections.pydefeqhash(o):"""调用``obj.__eqhash__``."""try:returno.__eqhash__()exceptAttributeError:returnhash(o)oftheconnection什么是__eqhash__?site-packages/kombu/connection.pydef__eqhash__(self):returnHashedSeq(self.transport_cls,self.hostname,self.userid,self.password,self.virtual_host,self.port,repr(self.transport_options))可以看到你知道,eqhash使用连接的一些连接参数作为哈希函数的输入参数。如果我们配置的连接参数相同,就不用担心重复创建连接池了。连接池的基本功能连接池的基本功能:获取连接和放回连接下面看看kombu对这两个基本主题的解决方案:获取连接site-packages/kombu/connection.pydef__enter__(self):returnself放回与get_connection_from_pool_group(connection)的连接asconn:pass在这样的上下文管理器中,当以body退出时,会执行conn的__exit__方法site-packages/kombu/connection.pydef__exit__(self,*args):self.release()可以看到执行了release方法,我们看一下Connection的release方法site-packages/kombu/connection.pydefrelease(self):"""关闭连接(ifopen)."""self._close()close=releaseConnection的release方法调用_close方法site-packages/kombu/connection.pydef_close(self):"""真正关闭连接,即使是a的一部分连接池。"""self._do_close_self()self._do_close_transport()self._debug('closed')self._closed=True_close关闭连接。这是错误的,连接不应该被关闭,而是因为它被放回了连接池,所以连接被关闭而不是被放回连接池?当然不是,classConnectionPool(Resource):"""Poolofconnections."""我们在执行ConnectionPool的acquire方法的时候,实际上执行的是继承自Resource的acquire方法。我们看一下acquire方法的内容:defacquire(self,block=False,timeout=None):"""获取资源。参数:block(bool):如果超过限制,则阻塞,直到有一个可用的项目。timeout(float):如果``block``为真,则等待超时。默认为:const:`None`(永远)。引发:LimitExceeded:如果块为假且已超过限制。“”“ifself._closed:raiseRuntimeError('Acquireonclosedpool')ifself.limit:while1:try:R=self._resource.get(block=block,ti??meout=timeout)exceptEmpty:self._add_when_empty()否则:尝试:R=self.prepare(R)exceptBaseException:ifisinstance(R,lazy):#还没有评估,把它放回去self._resource.put_nowait(R)else:#评估所以必须先尝试释放/关闭。self.release(R)raiseself._dirty.add(R)breakelse:R=self.prepare(self.new())defrelease():"""释放资源,以便它可以被另一个线程使用。警告:调用者负责丢弃该对象,并且不再使用该资源。如果需要,必须获取新资源。"""self.release(R)R.release=releasereturnR看到了吗?当我们从池中获取连接时,替换了原来的连接释放方法。退出上下文时,执行更改后的释放,释放不会关闭网络连接。管理使用中的连接显然是一个重要的话题。毕竟,我们需要控制池的大小。在这个逻辑中,我们可以从Resource类的acquire和release方法中理解上下文while1:try:R=self._resource.get(block=block,ti??meout=timeout)exceptEmpty:self._add_when_empty()else:try:R=self.prepare(R)exceptBaseException:ifisinstance(R,lazy):#还没有评估,把它放回去self._resource.put_nowait(R)else:#评估所以必须先尝试释放/关闭.self.release(R)raiseself._dirty.add(R)break当连接弹出时,self._dirty将被执行。add(R)将其添加到self._dirty(设置了self._dirty的类型);当一个连接需要放回池中时,会执行self._dirty.discard(resource)连接失败怎么办?此连接安全吗?重新连接旧连接
