前言kombu下发消息时,支持多种序列化方式:jsonyamlpickle今天的主题是看一个dict经过这三种序列化方式序列化后的消息是什么样子的(以rabbitmq管理界面的外观为准)实验先准备一个dict,value的类型有str、int、datetime、subdict等类型data={'name':'jike','age':18,'birthday':get_utc_now_timestamp(),'score':{'math':100,'science':99.5,'english':59}}先看看不指定序列化器的结果:withConnection(amqp_uri)asconn:withconn.channel()aschannel:started_at=time.time()message=Message(channel=channel,body=data)producer=Producer(channel,exchange=imdb_exchange)res=producer.publish(body=message.body,routing_key='to_refresh',headers=message.headers)ended_at=time.time()logger.debug(f'paytime{ended_at-started_at}s')可以看到因为消息体是dict,e即使kombu中缺少序列化器,也选择根据json序列化怎么得出“根据json序列化”的结论呢?因为看到消息header中的“content_type”属性:content_type:application/json如果我们选择json呢?以Connection(amqp_uri)作为conn:以conn.channel()作为通道:started_at=time.time()message=Message(channel=channel,body=data)producer=Producer(channel,exchange=imdb_exchange)res=producer.发布(body=message.body,routing_key='to_refresh',headers=message.headers,serializer='json')ended_at=time.time()logger.debug(f'paytime{ended_at-started_at}s')OK看,和上面没有什么区别,都是content_type:application/json,那么如果我们选择yaml呢?以Connection(amqp_uri)作为conn:以conn.channel()作为通道:started_at=time.time()message=Message(channel=channel,body=data)producer=Producer(channel,exchange=imdb_exchange)res=producer.发布(正文=消息。正文,routing_key='to_refresh',headers=message.headers,serializer='yaml')ended_at=time.time()logger.debug(f'paytime{end_at-started_at}s')可以看到我们传递的dict对象,变成yaml文本再看pickle:可以看到,这个时候,我们已经看不到body了,因为pickle是二进制序列化方式完整代码:fromkombuimportExchange,QueuefromkombuimportConnectionfromkombu。消息导入Producerfromkombu.transport.baseimportMessagefromkombuimportExchange,Queuefromloguruimportloggerimporttimefromdatetimeimportdatetime,timedelta,timezonedefget_min_utc_timestamp()->datetime:return(datetime(year=1970,month=1,day=1)+timedelta(seconds=1)).replace(tzinfo=timezone.utc)defget_utc_now_timestamp()->datetime:"""https://blog.csdn.net/ball4022/article/details/101670024"""返回日期时间。utcnow().replace(tzinfo=timezone.utc)amqp_uri='amqp://pon:pon@192.168.31.245:5672//'defdeclare_exchange(交换:交换格):将连接(amqp_uri)作为conn:将conn.channel()作为通道:exchange.declare(channel=channel)defdeclare_queue(queue:Queue):将Connection(amqp_uri)作为conn:将conn.channel()作为通道:queue.declare(channel=channel)imdb_exchange=Exchange('imdb',type='fanout')declare_exchange(exchange=imdb_exchange)imdb_queue=Queue('imdb_refresh',imdb_exchange,routing_key='to_refresh',durable=True)declare_queue(queue=imdb_queue)data={'name':'jike','age':18,'birthday':get_utc_now_timestamp(),'score':{'math':100,'science':99.5,'english':59}}withConnection(amqp_uri)asconn:withconn.channel()aschannel:started_at=time.time()message=Message(channel=channel,body=data)生产者=生产者(channel,exchange=imdb_exchange)res=producer.publish(body=message.body,routing_key='to_refresh',headers=message.headers,serializer='pickle')ended_at=time.time()logger.debug(f'paytime{end_at-started_at}s')参考资料:kombudoc:序列化
