当前位置: 首页 > 后端技术 > Python

使用kombu作为rabbitmq的消费客户端——并发消费

时间:2023-03-26 13:45:11 Python

前提:一个进程,一个amqp连接。本来想用多线程来实现的,但是好像不行:一个连接,多线程消耗,但是不想一个线程,一个连接,太浪费连接了。然后想到nameko是一个Process一个connection,然后使用evnetlet协程实现并发消费。所以我写了下面的demofromloguruimportloggerfromkombu.transport.pyamqpimportMessagefromkombuimportExchange,QueuefromkombuimportConnection,Consumer,Queuefromconcurrent.futuresimportThreadPoolExecutorimporttimeimporteventletevent.monkey_patch()amqp_uri='amqp://pon:pon@192.168.31.245:5672//'pool=eventlet.GreenPool(10)defhandle_message(message:Message):logger.debug(message)logger.debug(message.body)time.sleep(1)message.ack(multiple=True)refresh_exchange=Exchange('refresh',type='topic')imdb_queue=Queue('refresh_imdb',refresh_exchange,routing_key='to_imdb',durable=True)queues:list[Queue]=[imdb_queue]defstart_consuming(消息:消息):pool.spawn_n(handle_message,message)withConnection(amqp_uri)asconn:withconn.channel()aschannel:consumer=Consumer(channel,queues=queues,prefetch_count=10,on_message=start_consuming)withconsumer:whileTrue:conn.drain_events()消费一个任务需要1秒,预取10个协程。poolsize为10,从下面的消费数据可以看出,每秒消费10个,所以达到了并发