importpikaclassRabbitmq():__new=None__init=Truedef__new__(cls,*args,**kwargs):如果cls.__new为None:cls.__new=object.__new__(cls)returncls.__newdef__init__(self,queue):''':paramqueue:queuename'''self.queue=queueifRabbitmq.__init:#linkrabbitmqpika.PlainCredentials(username='username',password='password')self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='IP地址',port=端口号))self.channel=self.connection.channel()self.channel.basic_qos(prefetch_count=1)#公平分配self.channel.queue_declare(queue=queue)#创建队列Rabbitmq.__init=Falsedefbasic_publish(self,body):''':parambody:要插入的数据:return:插入数据'''self.channel.basic_publish(exchange='',routing_key=self.queue,body=body)defbasic_consume(self,callback):''':return:确认监听队列auto_ck:默认响应方式'''self.channel.basic_consume(queue=self.queue,auto_ack=True,on_message_callback=callback)defconsume(self):''':return:官方监听'''self.channel.start_consuming()defclose(self):''':return:关闭连接'''self.connection.close()if__name__=='__main__':queue='ceshi3'rbmq=Rabbitmq(queue)foriinrange(10000):print(i)rbmq.basic_publish('hollerwordhaha??hah'+str(i))defcallback(ch,method,properties,body):print("[x]:",body)rbmq.basic_consume(回调)rbmq.consume()rbmq.close()
