当前位置: 首页 > 后端技术 > Python

添加监控文件目录并发送到rabbitmq案例1

时间:2023-03-25 21:42:21 Python

base64_utilsimportosfromPILimportImageimportreimportbase64fromioimportBytesIOdefimage_to_base64(file_path):"""file_path将图片转为b64encode编码格式的文件路径"""withopen(file_path,'rb')asfp:returnbase64.b64encode(fp.read()).decode('utf-8')defpImg_to_base64(pImg):"""convertbinaryimagetobase64"""buffered=BytesIO()皮姆。保存(缓冲,格式=“JPEG”)#returnbase64.b64encode(buffered.getvalue())returnbase64.b64encode(buffered.getvalue()).decode('utf-8')#https://www.jb51.net/article/178106.htmdefbase64_to_pImg(data):"""将base64转换为二进制图像,方便PIL的Image.open(binary_img).show()"""#binary_img=BytesIO(base64.b64decode(data))binary_img=BytesIO(base64.b64decode(data.encode('utf-8')))returnImage.open(binary_img)defbase64_to_image(filename,data,path=''):""":paramfilename:转换后的图像名称:参数数据:base64:参数路径:转换后的图像文件存在路径:return:"""file_path=os.path.join(path,filename)withopen(file_path,"wb")asfp:fp.write(base64.b64decode(data.encode('utf-8')))#转换为图像image_path='./data/img/0234.jpg'if__name__=='__main__':baseData=image_to_base64(image_path)image=base64_to_pImg(baseData)basexxx=pImg_to_base64(image)base64_to_image('aaa.jpg',basexxx)#byte_img=BytesIO(base64.b64decode(baseData.encode('utf-8')))#image=Image.open(byte_img)#image.show()#print(image.size)##print(baseData)复制代码)#base64_save_image(baseData)##Image.open(image_path).show()rabbit_utilsimportpikaimportjsondefcreate_connection(config):"""创建一个RabbitMQ连接:paramconfig::return:"""#Function:创建时的登录凭据aconnectioncredentials=pika.PlainCredentials(username=config['username'],password=config['password'])#mq用户名和密码#虚拟队列需要指定参数virtual_host,如果是默认的就可以留空#功能:连接MQ的参数设置param=pika.ConnectionParameters(host=config['host'],port=config['port'],virtual_host=config['virtual_host'],credentials=credentials)returnpika.BlockingConnection(param)classSubscriber():"""MessageSubscriber"""def__init__(self,queueName,bindingKey,config):self.queueName=queueNameself.bindingKey=bindingKeyself.config=configself.connection=create_connection(self.config)def__del__(self):self.connection.close()defon_message_callback(self,channel,method,properties,body):""":paramchannel::parammethod::paramproperties::parambody:"""#将收到的消息从json转换为字符串#message=json.loads(body)message=bodyprint("[received]%r:%r"%(method.routing_key,message))defsetup(self,on_message_callback):#功能:创建一个频道channel=self.connection.channel()#声明exchangeexchange,exchange指定消息投递到哪个队列,如果不存在则创建#durable=True表示exchange持久化存储,False表示非持久化存储#channel.exchange_declare(exchange=self.config['exchange'],exchange_type='topic',durable=True)#Function:declarequeuechannel.queue_declare(queue=self.queueName,durable=True)#功能:通过routingkey绑定queue和exchangechannel.queue_bind(queue=self.queueName,exchange=self.config['exchange'],routing_key=self.bindingKey)#功能:从队列中获取消息并开始消费channel.basic_consume(queue=self.queueName,on_message_callback=on_message_callback,auto_ack=True)try:channel.start_consuming()exceptKeyboardInterrupt:channel.stop_consuming()classPublisher:"""Message发布者"""def__init__(self,queueName,config):self.queueName=queueNameself.config=configdefpublish(self,routing_key,message):""":paramrouting_key::parammessage:"""connection=创建连接(self.config)channel=connection.channel()#声明exchange,exchange指定消息将投递到哪个队列,如果不存在则创建#durable=True表示exchange持久化存储,False表示非持久化存储durable=True)#功能:发布消息到Rabbitmqexchangechannel.basic_publish(exchange=self.config['exchange'],routing_key=routing_key,body=message)print("[x]Sentmessage%rfor%r"%(message,routing_key))receive_client.pyfromutils.rabbit_utilsimportSubscriberfromutils.base64_utilsimportbase64_to_pImgimportjsonconfig={'host':'192.167.113.83','port':5672,'username',:'gu'password':'guest','exchange':'exchange01','virtual_host':'/',}defon_message_callback(channel,method,properties,body):""":paramchannel::parammethod::paramproperties::parambody:"""#将收到的消息从json转换为字符串#message=json.loads(body)message=body.decode()json_=json.loads(message)name=json_['name']base64=json_['base64']pIg=base64_to_pImg(base64)#pIg就是图片#pIg.show()print(name)print(base64))打印("[received]%r:%r"%(method.routing_key,message))if__name__=='__main__':receive=Subscriber('hello','rout',config)receive.setup(on_message_callback=on_message_callback)send_client.pyfromutils.rabbit_utilsimportPublisherimportos,json,timefromwatchdog.eventsimportFileSystemEventHandlerfromwatchdog.observersimportObserverfromutils.base64_utilsimportimage_to_base64config={'host':'192.167.113.83','port':5672,'用户名':'guest','password':'guest','exchange':'exchange01','virtual_host':'/',}path='/Users/apple/Desktop/app/nginx'sender=Publisher("你好”,配置)#pipinstallwatchdogclassMyEventHandler(FileSystemEventHandler):defon_created(self,event):_dict={}f=event.src_pathf_name=f.rsplit('/')[-1]base64=image_to_base64(f)_dict['name']=f_name_dict['base64']=base64结果=json.dumps(_dict)sender.publish("rout",result)os.remove(f)#defon_closed(self,event):#print(event.src_path)if__name__=='__main__':event_handler=MyEventHandler()观察者=观察者()observer.schedule(event_handler,路径,递归=真)