当前位置: 首页 > 后端技术 > Python

nameko关闭消息确认和持久化

时间:2023-03-26 12:35:47 Python

参考:为什么nameko下发消息这么慢?nameko下发不同大小消息的速率对比前言Nameko的瓶颈——下发消息太慢动机:最近使用nameko+rabbitmq的系统流量太低,系统QPS在1k-2k之间.因此,需要对系统的QPS进行优化。我用kombu写了producer和consumer的demo代码。即使开启了消息持久化,我发现吞吐量也很容易超过2w+/s(特别是指生产者下发消息的速率。),所以肯定有问题。最后查阅资料发现nameko在发送消息时默认开启了消息确认。producer的消息确认是:当client向rabbitmqserver下发任务时,会等待rabbitmqserver返回一个ack,表示rabbitmqserver已经可靠地收到消息。而且这个过程是同步的,即一个消息投完之后,只有收到ack后才能投递下一条消息。amqp客户端下发消息时,背后的网络请求看图说话。图片使用wireshark抓包amqp协议的流量:打开消息确认使用wireshark抓包打开confirm_publish截图。192.168.26.130是客户端;192.168.38.191是rabbitmq服务器可以看到,消息投递时,客户端发送一个Basic.Publish,然后rabbitmq服务器返回一个Basic.Ack,重复关闭消息确认。如果我们关闭confirm_publish,我们可以看到此时从客户端到服务端只有Basic。发布,但没有从服务器到客户端的Basic.Ack。是否启用confirm_publish需要结合“QPS大小”和“消息的重要性”来考虑。上面列出的一些具体值可以作为“流量”和“消息有多重要?大家自己衡量吧。QAQ关于rabbitmq服务器FAQ:kombu向rabbitmq投递消息时,是否可以批量投递?A:如果用amqp协议5672端口是不行的;可以用http15672端口吗?也不行。所以总而言之是不行的。Q:rabbitmqpost消息可以异步确认吗?A:不知道,反正kombu好像没有相关的API可以做这个。Q:rabbitmq可以异步批量下发消息吗?A:不知道,反正kombu好像没有相关的API可以做这个。Q:在给Rabbitmq下发消息时,是开启消息持久化对QPS影响更大,还是开启消息确认影响更大?A:开启消息确认影响比较大,因为网络在磁盘上比较慢。事件模式事件模式是生产者消费者异步解耦模式。nameko的离线事件模式,如flask、fastapi、django等。当我们需要向rabbitmq投递消息时,为了与nameko集成,可以使用nameko提供的离线模式seo优化:Flask如何集成namekofastapi如何集成namekoflask如何通过nameko向rabbitmq队列投递消息?fastapi如何通过nameko向rabbitmq队列投递消息?贴出的代码:importjsonimportsettingsfromnameko.standalone.rpcimportClusterRpcProxyfromnameko.standalone.eventsimportevent_dispatcherfromnameko.constantsimportNON_PERSISTENT,PERSISTENTfromloguruimportloggerconfig={'AMQP_QURI':f'amqp://:'f'{settings.RABBITMQ_CONFIG.password}@{settings.RABBITMQ_CONFIG.host}:'f'{settings.RABBITMQ_CONFIG.port}/{settings.RABBITMQ_CONFIG.vhost}'}dispatch=event_dispatcher(config,delivery_mode=NON_PERSISTENT,use_confirms=False)data={'name':'jike','age':18,'score':{'math':100,'science':99.5,'english':59}}for_inrange(10000000):dispatch('worker_service','to_rubbish',json.dumps(data))消费者代码:从输入importCallablefromnameko.rpcimportrpcfromnameko.timerimporttimerfromnameko.constantsimportNON_PERSISTENTfromnameko.eventsimportE,event_handlerfromloguruimportloggerclassTrashCanService:name='trash_scan_service'@event_handler('worker_service','to_rubbish')defclear(self,message:str):pass看速度:可以看到,速度是很猛,单次连接可以直接上7k+。如果是多进程交付呢?让我们试试多进程生产者版本吧!我用了一个进程池,开了8个进程一起做importjsonimportmultiprocessingimportsettingsfromnameko.standalone.rpcimportClusterRpcProxyfromnameko.standalone.eventsimportevent_dispatcherfromnameko.constantsimportNON_PERSISTENT,PERSISTENTfromloguruimportlogersimportlogggerconfig={'AMQP_URI':f'amqp://{settings.RABBITMQ_CONFIG.username}:'f'{settings.RABBITMQ_CONFIG.password}@{settings.RABBITMQ_CONFIG.host}:'f'{settings.RABBITMQ_CONFIG.port}/{settings.RABBITMQ_CONFIG.vhost}'}dispatch=event_dispatcher(config,delivery_mode=NON_PERSISTENT,use_confirms=False)defrun():data={'name':'jike','age':18,'score':{'math':100,'science':99.5,'english':59}}for_inrange(10000000):dispatch('worker_service','to_rubbish',json.dumps(data))if__name__=="__main__":pool=multiprocessing.Pool(processes=8)foriinrange(100000):pool.apply_async(run)pool.close()pool.join()可以看到这时候,投递速度已经达到10k+。有了很大的进步。此时rabbitmq服务器的CPU占用率不高也不低;网络io还没有达到瓶颈。滑子的在线模式是什么?前面提到了离线模式,离线模式可以让我们通过nameko需要的方式传递flask和fastapi中的消息规则传递消息,在线模式是nameko服务相互传递消息的方式。很简单,只需要在实例化EventDispatcher例子的时候加上相应的参数delivery_mode=NON_PERSISTENT,use_confirms=FalsefromtypingimportCallablefromnameko.rpcimportrpcfromnameko.timerimporttimerfromnameko.constantsimportNON_DiPERSISTENTfromnameko.eventsimportEevent_handlerfromloguruimportloggerclassWorkerService:name='worker_service'event_dispatch:Callable=EventDispatcher(delivery_mode=NON_PERSISTENT,use_confirms=False)@timer(1)defreceive(self):logger.debug(f'start')for_inrange(100000):self.event_dispatch('to_rubbish','hahahahah')classTrashCanService:name='trash_scan_service'@event_handler('worker_service','to_rubbish')defclear(self,message:str):passrpcmodetodo方法离线在线

猜你喜欢