使用kombu投递消息,即使开启了消息持久化,单连接的投递速度依然是1w+/s,但是nameko慢的离谱,没有开启1k+/s;消息持久化开启,为什么100+/s?经过一番研究,这与amqp的消息确认(confirm_publish)选项有关。参考:Kombu如何实现RabbitMQ的可靠消息发布(confirm机制)开启confirm_publish?创建连接时,带上'confirm_publish':True启用confirm_publishwithConnection(amqp_uri,transport_options={'confirm_publish':True})asconn:withconn.channel()aschannel:started_at=time.time()foriinrange(10000):message=Message(channel=channel,body=data)producer=Producer(channel,exchange=refresh_exchange)res=producer.publish(body=message.body,routing_key='to_imdb',headers=message.headers回复,delivery_mode=2#serializer='pickle')ended_at=time.time()logger.debug(f'paytime{ended_at-started_at}s')看nameko是源码:可以看到,默认是confirms=True,表示默认启用消息确认amqp/publish.py@contextmanagerdefget_producer(amqp_uri,confirms=True,ssl=None,login_method=None,transport_options=None):如果transport_options是None:transport_options=DEFAULT_TRANSPORT_OPTIONS.copy()transport_options['confirm_publish']=confirmsconn=Connection(amqp_uri,transport_options=transport_options,ssl=ssl,login_method=login_method)withproducers[conn].acquire(block=True)asproducer:yieldproducer,如果要在nameko中关闭confirm_publish呢?可以使用use_confirms参数,示例如下:importtimefromnameko.constantsimportNON_PERSISTENT,PERSISTENTfromnameko.standalone.eventsimportevent_dispatcherimportsettingsfromloguruimportloggerconfig={'AMQP_URI':f'amqp://{settings.RABBITMQ_CONFIG.username}:'f'{settings.RABBITMQ_CONFIG.password}@}{settings.RABBITMQ_CONFIG.host}:'f'{settings.RABBITMQ_CONFIG.port}/{settings.RABBITMQ_CONFIG.vhost}'}logger.debug(config)data={'name':'jike','age':18,'score':{'math':100,'science':99.5,'english':59}}dispatch=event_dispatcher(config,delivery_mode=PERSISTENT,use_confirms=False)for_inrange(10000):#logger.debug(f'投递任务')dispatch('productor_service','a',data)
