今天分析了RabbitMQ消息的持久化,即客户端发送持久化MQ消息后服务端做了什么。下面是客户端的发送代码:$client=newClient('127.0.0.1',5672,'guest','guest');//设置正常的开关和队列$type='topic';$routingKey='你好';$exchangeName='hello_exchange'$exchange=newExchange($client,$exchangeName,$type);$exchange->setDurable(true);//队列$queue=newQueue($client,$this->queueName,[newConsumer(function(AMQPMessage$msg){var_dump($msg);}),]);$binding=newBinding($exchange,$queue);$binding->setRoutingKey($routingKey);$客户端->注册($绑定);$message=newMessage("你好".str_repeat('123456789',13));$res=$exchange->publish($message,$routingKey);分析网络数据包并发送消息这时候,basic.publish命令才真正发送到服务器。调用链接分析入口在rabbit_channel文件:handle_method(#'basic.publish'{exchange=ExchangeNameBin,routing_key=RoutingKey,mandatory=Mandatory},Content,State=#ch{virtual_host=VHostPath,tx=Tx,channel=ChannelNum,confirm_enabled=ConfirmEnabled,trace_state=TraceState,user=#user{username=Username},conn_name=ConnName,delivery_flow=Flow})->……caserabbit_basic:message(ExchangeName,RoutingKey,DecodedContent)of{ok,Message}->Delivery=rabbit_basic:delivery(Mandatory,DoConfirm,Message,MsgSeqNo),QNames=rabbit_exchange:route(Exchange,Delivery),DQ={Delivery#delivery{flow=Flow},QNames},{noreply,caseTxofnone->deliver_to_queues(DQ,State1);{Msgs,Acks}->Msgs1=queue:in(DQ,Msgs),State1#ch{tx={Msgs1,Acks}}end};结尾;上面删除了一些非关键代码,这里是看是否有事务,如果没有事务,会通过deliver_to_queues发送,有一个事务高级队列,今天我们主要分析没有事务的处理过程deliver_to_queues({Delivery=#delivery{message=Message=Message=#basic_message{exchange_name=XName},mandatory=Mandatory,confirm=Confirm,msg_seq_no=MsgSeqNo},DelQNames},State=#ch{queue_names=QNames,queue_monitors=QMons})->Qs=rabbit_amqqueue:lookup(DelQNames),DeliveredQPids=rabbit_amqqueue:deliver(Qs,Delivery),调用rabbit_amqqueue:deliver处理:deliver(Qs,Delivery=#delivery{flow=Flow})->{MPids,SPids}=qpids(Qs),QPids=MPids++SPids,MMsg={deliver,Delivery,false},SMsg={deliver,Delivery,true},delegate:cast(MPids,MMsg),delegate:cast(SPids,SMsg),QPids.deliver的逻辑比较简单,分为master和slave进程ID,如果不开启镜像队列,slave进程ID为空,今天不分析镜像队列。将deliver消息发送到主进程,即rabbit-amqueue-process。看看rabbit-amqueue-process是怎么处理的:handle_cast({deliver,Delivery=#delivery{sender=Sender,flow=Flow},SlaveWhenPublished},State=#q{senders=Senders})->%%SlaveWhenPublishedonlyIt在slave时为truenoreply(deliver_or_enqueue(Delivery,SlaveWhenPublished,State1));中间还有很多代码,就不一一贴了。一般来说,deliver_or_enqueue会调用attempt_delivery,然后调用rabbit-variable-queue:publishpublish(Msg=#basic_message{is_persistent=IsPersistent,id=MsgId},MsgProps=#message_properties{needs_confirming=NeedsConfirming},IsDelivered,_ChPid,_Flow,State=#vqstate{q1=Q1,q3=Q3,qlow_Qembs==IndexMaxSize,next_seq_id=SeqId,in_counter=InCount,durable=IsDurable,unconfirmed=UC})->IsPersistent1=IsDurableandIsPersistent,MsgStatus=msg_status(IsPersistent1,IsDelivered,SeqId,Msg,MsgProps,IndexMaxSize),{MsgStatus1,State1}=maybe_write_to_disk(false,false,MsgStatus,State),调用maybe_write_to_disk来保存消息:Mmaybe_write_to_dex,StatusFor_gat(State)->maybe_to_disks用来用来将消息持久化,and_write_index_to_disk用来将将将and_write_to_disk(forcemsg,forcesIndex,msgstatus,state)->{msgstatus1,state1,state1}=satem_msg_msg_msg_mmsg_disk_disk_disk(forcemsg,forcemsg,msgStatus,mssexteext,MSSEXTENT),可能。如果消息大小小于配置文件中的queue_index_embed_msgs_below,persist_to返回queue_index,否则返回msg_store。该参数默认为4096,即如果消息体大小小于4096,消息将不会写入消息持久化文件。相反,它被写入索引文件。文件rabbit_msgstore负责消息的持久化,msgstorewrite会调用writemessage来保存消息:Count})whenForceorelseIsPersistent->casepersist_to(MsgStatus)ofmsg_store->ok=msg_store_write(MSCState,IsPersistent,MsgId,prepare_to_store(Msg)),{MsgStatus#msg_status{msg_in_store=true},State#vqstate=计数+1}};queue_index->{MsgStatus,State}end;这里的逻辑比较简单,将消息的内容保存到当前文件中,然后判断当前文件的大小,必要时新建一个持久化文件。这里先说段,每个段对应一个文件(所在目录为mnesia数据目录下的msg_store_persistent)。每个文件最多可以保存SEGMEN_ENTRY_COUNT(16384)条消息索引信息。这些文件以整数命名。消息对应哪个segment文件?使用消息索引本身来汇总SEGMENT_ENTRY_COUNT。相关代码见rabbit_queue_index:add_to_journal。最后再看索引的持久化maybe_write_msg_to_disk(Force,MsgStatus=#msg_status{msg=Msg,msg_id=MsgId,is_persistent=IsPersistent},State=#vqstate{msg_store_clients=MSCState,disk_write_countse=Count})PsistentForceorelmsg_store的persist_to(MsgStatus)->ok=msg_store_write(MSCState,IsPersistent,MsgId,prepare_to_store(Msg)),{MsgStatus#msg_status{msg_in_store=true},State#vqstate{disk_write_count=Count+1}};queue_index->{MsgStatus,State}end;搜索通过rabbit_queue_index:publish来落盘:publish(MsgOrId,SeqId,MsgProps,IsPersistent,JournalSizeHint,State=#qistate{unconfirmed=UC,unconfirmed_msg=UCM})->MsgId=case#basic_message的MsgOrId{id=Id}->Id;身份证henis_binary(Id)->Idend,?MSG_ID_BYTES=size(MsgId),%%JournalHd1对应journal.jif{JournalHdl,State1}=get_journal_handle(case{MsgProps#message_properties.needs_confirming,MsgOrId}of{true,MsgId}->UC1=gb_sets:add_element(MsgId,UC),State#qistate{unconfirmed=UC1};{true,_}->UCM1=gb_sets:add_element(MsgId,UCM),State#qistate{unconfirmed_msg=UCM1};{假,_}->状态结束),file_handle_cache_stats:update(queue_index_journal_write),{Bin,MsgBin}=create_pub_record_body(MsgOrId,MsgProps),ok=file_handle_cache:append(JournalHdl,[<<(caseIsPersistentoftrue->?PUB_PERSIST_JPREFIX;false->?PUB_TRANS_JPREFIXend):?JPREFIX_BITS,SeqId:?SEQ_BITS,Bin/binary,(size(MsgBin)):?EMBEDDED_SIZE_BITS>>,MsgBin]),maybe_flush_journal(JournalSizeHint,add_to_journal(SeqId,{IsPersistent,Bin,MsgBin},State1)).索引文件会先写入journalcache,然后周期性的刷新到磁盘。相关参数是queue_index_max_journal_entries。判断当前写次数是否达到queue_index_max_journal_entries。如果是这样的话,实际刷盘到索引持久化文件是在rabbit_variable_queue:handle_pre_hibernate中异步进行的,这里不再详细描述。索引持久化文件在mnesia目录的queues目录下,文件扩展名为idx。如何保证消息不丢失,即如果journal文件写入成功,但是如果没有刷新到索引持久化文件中如何恢复,可以看代码rabbit_variable_queue:init,会调用当RabbitMQ开始从日志中恢复索引和消息时,在启动每个队列之前。最后,持久化分为消息体和索引持久化。如果消息体小于queue_index_embed_msgs_below,则将消息写入索引文件,只进行一次磁盘操作。否则需要两次磁盘操作:消息体+索引,消息体写入segment文件,一个segment可以保存16384条消息。为了加快写入性能,消息体采用append方式写入;索引的持久化首先附加到日志文件,然后异步刷新到索引文件。RabbitMQ网络框架代码分析二:命令分发RabbitMQ网络框架代码分析从RabbitMQ通道设计看连接复用
