单线程针对只有一个进程、一个线程、一个amqp连接的情况,我们开一个连接供任务消费。这时候就需要一个“后台”来帮助我们维护amqp的心跳。如何实现这个“背景”?其实有很多选择,比如:threadeventletcoroutinegeventcoroutine下面是一个“eventletcoroutine”的例子fromkombuimportConnection,Consumer,QueuefromkombuimportExchange,Queuefromloguruimportloggerfromkombu.transport.pyamqpimportMessageimporttimeimporteventleeventlet。monkey_patch()amqp_uri='amqp://pon:pon@192.168.31.245:5672//'flag=0defhandle_message(message:Message):logger.debug(message.body)message.ack()refresh_exchange=Exchange('刷新',type='topic')imdb_queue=Queue('imdb',refresh_exchange,routing_key='to_imdb',durable=True)heartbeat_interval=5defheartbeat_check_forever(heartbeat_interval:int|float=None):whileTrue:conn.heartbeat_check()logger.debug(f'心跳检查完成')time.sleep(heartbeat_interval/2/2ifheartbeat_intervalelse1)withConnection(amqp_uri,heartbeat=heartbeat_interval)asconn:withconn.channel()aschannell:consumer=Consumer(channel,queues=[imdb_queue],prefetch_count=10)consumer.on_message(handle_message)withconsumer:eventlet.spawn_n(heartbeat_check_forever,heartbeat_interval)而真:conn.drain_events()
