当前位置: 首页 > 后端技术 > Python

rabbitmq基于python操作

时间:2023-03-26 16:36:15 Python

简单模式生产者1链接rabbitmq2创建队列3插入数据到指定队列消费者1链接rabbitmq2监听模式3确认回调函数示例:#producerimportpika#linkrabbitmqcreadentials=pika.PlainCredentials(username='username',password='password')connection=pika.BlockingConnection(pika.ConnectionParameters(host='address',port=port))channel=connection.channel()#createqueuechannel.queue_declare(queue='ceshi')#插入数据到指定队列[exchange:switch模式,简单模式为空routing_key:指定队列body:待插入的值]channel.basic_publish(exchange='',routing_key='ceshi',body='helloworld!')print('[x]--')#Consumerimportpika#Createlinkpika.PlainCredentials(username='username',password='password')connection=pika.BlockingConnection(pika.ConnectionParameters(host='address',port=port))channel=connection.channel()#创建队列channel.queue_declare(queue='ceshi')#确定回调函数defcallback(ch,method,properties,body):print("[x]:",body)#确认监听队列[auto_ack:默认应答]channel.basic_consume(queue='ceshi',auto_ack=True,on_message_callback=callback)打印('[*]:--')#官方监控channel.start_consuming()参数使用1.响应参数consumer在监控队列时确定一个参数auto_acktrue:默认响应:从队列中取出一条数据后,队列中的数据不存在,如果是在数据处理过程中,如果程序出现问题,就会造成数据丢失。false:人工响应:从队列中取出一条数据后,该条数据还在队列中。一个命令会告诉队列我已经执行完毕,可以删除这个数据了。人工回复必然会影响效率。具体选择要根据项目需求:是追求数据安全还是效率2.持久化将数据保存到磁盘,防止程序运行中途Rabbitmq服务异常导致数据丢失Producer在创建队列通道时声明durable=True。queue_declare(queue='ceshi',durable=True)插入数据时声明属性=pika.BasicProperties(delivery_mode=2)channel.basic_publish(exchange='',routing_key='ceshi',body='helloworld!',properties=pika.BasicProperties(delivery_mode=2))创建队列时消费者声明durable=Truechannel.queue_declare(queue='ceshi2',durable=True)3.分发参数轮询分发正常开启。多个消费者正在轮询分配。比如队列中有8条数据,每个人有4个公平分配。处理速度越快,您可以获得的数据就越多。公平分配要求Consumer可以添加channel.basic_qos(prefetch_count=1)switchmodepublishsubscriptionmodeProducer1linkrabbitmq2创建switch,类型为fanout3向switch中插入数据consumer1linkrabbitmq2创建switch,类型fanout3创建队列并绑定交换机4监听方式5确认回调函数示例:#Producerimportpika#Linkrabbitmqpika.PlainCredentials(username='username',password='password')connection=pika.BlockingConnection(pika.ConnectionParameters(host='address',port=port))channel=connection.channel()#声明一个名为logs的交换机,其类型为fanoutchannel.exchange_declare(exchange='logs',exchange_type='fanout')#fanout:发布订阅模式#insertintothelogsswitchDatahello世界!channel.basic_publish(exchange='logs',routing_key='',body='helloworld!')print('[x]---')connection.close()#consumerimportpika#createLinkpika.PlainCredentials(username='username',password='password')connection=pika.BlockingConnection(pika.ConnectionParameters(host='address',port=port))channel=connection.channel()#声明一个具有相同生产者名称的交易所和type,避免了consumer-queue先找不到exchange的情况。channel.exchange_declare(exchange='logs',exchange_type='fanout')#fanout:发布订阅模式#createqueueexclusive:系统会创建一个随机唯一的队列名result=channel.queue_declare(queue='',exclusive=True)queue_name=result.method.queueprint(queue_name)#绑定指定队列到交换机channel.queue_bind(exchange='logs',queue=queue_name)#确定回调函数defcallback(ch,method,properties,body):print("[x]:",body)#确定监听队列[auto_ack:默认响应]channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)print('[*]:---')#正式监听channel.start_consuming()关键字模式Producer1链接rabbitmq2创建交换机,类型为direct3向交换机插入数据,插入时加上关键字,routing_key:要进入哪个消费者的队列,设置某个队列的关键字消费者1链接rabbitmq2创建交换机,类型为direct3创建队列并绑定交换机,绑定交换机时添加关键字routing_key4监听模式5确认回调函数示例:#Producerimportpika#Linkrabbitmqpika.PlainCredentials(username='username',password='密码d')connection=pika.BlockingConnection(pika.ConnectionParameters(host='address',port=port))channel=connection.channel()#声明一个开关channel.exchange_declare(exchange='logs',exchange_type='direct')#direct:关键字模式#Insertdataintothelogsswitchhelloworld!routing_key是关键字channel.basic_publish(exchange='logs',routing_key='info',body='helloworld!')print('[x]---')connection.close#Consumerimportpika#Createlinkpika.PlainCredentials(username='username',password='password')connection=pika.BlockingConnection(pika.ConnectionParameters(host='address',port=port))channel=connection.channel()#声明一个交换type作为生产者名称,避免先启动消费者——队列找不到交换通道。随机且唯一的队列名称result=channel.queue_declare(queue='',exclusive=True)queue_name=result.method.queueprint(queue_name)#将指定队列绑定到交换机,routing_key:关键字,多个关键字绑定多个通道。queue_bind(exchange='logs',queue=queue_name,routing_key='info')channel.queue_bind(exchange='logs',队列=queue_name,routing_key='error')#确定回调函数defcallback(ch,method,properties,body):print("[x]:",body)#确定监听队列[auto_ack:defaultanswer]channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)print('[*]:---')#官方监听channel.start_consuming()通配符模式producer1linkrabbitmq2创建switch,类型为topic3intoswitch插入数据,插入时加上关键字,routing_key:要进入哪个consumerqueue,设置某个queue的关键字,关键字可以用来拆分consumer1linkrabbitmq2创建switch,type为topic3create队列并绑定交换机。绑定交换机时,添加关键字routing_key。关键字可以使用通配符[*:匹配一次,#:匹配一次或多次]4监听模式5确定回调函数示例:#producerimportpika#Linkrabbitmqpika.PlainCredentials(username='username',password='password')connection=pika.BlockingConnection(pika.ConnectionParameters(host='address',port=port))channel=connection.channel()#声明一个名为logs的开关,类型为topicchannel.exchange_declare(exchange='logs',exchange_type='topic')#topic:wildcardpattern#Insertdataintothelogsswitch你好世界!频道.basic_publish(exchange='logs3',routing_key='usa.aaaa',body='hello21321!')print('[x]---')connection.close()#Consumerimportpika#Createlinkpika.PlainCredentials(username='username',password='password')connection=pika.BlockingConnection(pika.ConnectionParameters(host='address',port=port))channel=connection.channel()#声明生产者名称开关相同类型,避免先启动消费者——队列找不到交换机channel.exchange_declare(exchange='logs',exchange_type='topic')#topic:wildcardmode#Createqueueexclusive:系统会创建一个随机唯一的queuenameresult=channel.queue_declare(queue='',exclusive=True)queue_name=result.method.queueprint(queue_name)#绑定指定队列到交换机,routing_key:关键字,多个关键字绑定Multiplechannel.queue_bind(exchange='logs',queue=queue_name,routing_key='#.aaaa')#确定回调函数defcallback(ch,method,properties,body):print("[x]:",body)#确定监听队列[auto_ack:默认响应]channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)print('[*]:---')#官方监控channel.start_consuming()