前言本次继续分享RabbitMQClientpulish——发送消息,先分享之前分享的RabbitMQ客户端源码-连接和RabbitMQ客户端源码-通道和发布消息-PulishMessage小总结(还是基于之前的JavaClientConnectingtoRabbitMQDemo)。RabbitMQpulisher总结从图中我们可以看到RabbitMQ发布消息的流程:ConnectionFactory-->Connection-->Channel-->PulishMessage。发布消息交互式抓包的老套路——先抓包再分享源码(“有助于快速理解”)。BasicPublish&Ack抓包可见:pulisher(RabbitMQ消息发送者)和Broker(RabbitMQBroker)打开Channel后,发起Confirm.Select/Select-Ok--通知Broker需要发布消息确认了,所以也有后续的Basic.Pulish/Ack。梳理完交互流程,我们开始进入今天的话题,PushMessage。PulishMessage源码分析《发布消息的通用入口——ChannelN.basicPublish()》。/**公共API-{@inheritDoc}*/@OverridepublicvoidbasicPublish(Stringexchange,StringroutingKey,booleanmandatory,BasicPropertiesprops,byte[]body)throwsIOException{basicPublish(exchange,routingKey,mandatory,false,props,body);}/**公共API-{@inheritDoc}*/@OverridepublicvoidbasicPublish(Stringexchange,StringroutingKey,booleanmandatory,booleanimmediate,BasicPropertiesprops,byte[]body)throwsIOException{//发布者配置为`Confirm.Select`nextPublishSeqNo设置从1开始//将未确认的消息放入unconfirmedSet,加1if(nextPublishSeqNo>0){unconfirmedSet.add(getNextPublishSeqNo());nextPublishSeqNo++;}基本属性useProps=props;if(props==null){useProps=MessageProperties.MINIMAL_BASIC;}//构造AMQCommand并发送transmit(newAMQCommand(newBasic.Publish.Builder().exchange(exchange).routingKey(routingKey).mandatory(强制).immediate(立即).build(),useProps,body));//用于指标统计和监控,默认为NoOpMetricsCollector,需要配置使用提供的MicrometerMetricsCollector和StandardMetricsCollector(引入相应的包和配置开箱即用~)metricsCollector.basicPublish(this);}"ConstructAMQCommand》值得一提,RabbitMQClient应用消息的最小单位是Frame(帧,在Connection章节提到过),Frame主要由type类型、channel通道、payload消息内容字节、累加器写入数据、NON_BODY_SIZE组成《框架结构》publicclassFrame{/**框架类型代码*///FRAME_HEARTBEAT:心跳,FRAME_METHOD:方法,FRAME_HEADER:头信息,FRAME_BODY内容主体publicfinalinttype;/**帧通道号,0-65535*///通道序号publicfinalintchannel;/**帧有效载荷字节(对于入站帧)*///消息内容字节privatefinalbyte[]payload;/**帧负载(用于出站帧)*///写出数据privatefinalByteArrayOutputStreamaccumulator;privatestaticfinalintNON_BODY_SIZE=1/*type*/+2/*channel*/+4/*payloadsize*/+1/*endcharacter*/;...}AMQP0-9-1特定的“命令”读取,它从一系列帧中累积方法、标题和文本。/***用指定的方法、头和正文构造一个命令。*@parammethod包装的方法*@paramcontentHeader包装的内容头*@parambody消息正文数据*/publicAMQCommand(com.rabbitmq.client.Methodmethod,AMQContentHeadercontentHeader,byte[]body){this.assembler=newCommandAssembler((Method)method,contentHeader,body);}//AMQP0-9-1特定的命令,构造方法、头部和文字publicCommandAssembler(Methodmethod,AMQContentHeadercontentHeader,byte[]body){this.method=method;this.contentHeader=contentHeader;this.bodyN=newArrayList
