当前位置: 首页 > 科技观察

RabbitMQ客户端源码系列-PulishMessage

时间:2023-03-23 01:44:02 科技观察

前言本次继续分享RabbitMQClientpulish——发送消息,先分享之前分享的RabbitMQ客户端源码-连接和RabbitMQ客户端源码-通道和发布消息-PulishMessage小总结(还是基于之前的JavaClientConnectingtoRabbitMQDemo)。RabbitMQpulisher总结从图中我们可以看到RabbitMQ发布消息的流程:ConnectionFactory-->Connection-->Channel-->PulishMessage。发布消息交互式抓包的老套路——先抓包再分享源码(“有助于快速理解”)。BasicPublish&Ack抓包可见:pulisher(RabbitMQ消息发送者)和Broker(RabbitMQBroker)打开Channel后,发起Confirm.Select/Select-Ok--通知Broker需要发布消息确认了,所以也有后续的Basic.Pulish/Ack。梳理完交互流程,我们开始进入今天的话题,PushMessage。PulishMessage源码分析《发布消息的通用入口——ChannelN.basicPublish()》。/**公共API-{@inheritDoc}*/@OverridepublicvoidbasicPublish(Stringexchange,StringroutingKey,booleanmandatory,BasicPropertiesprops,byte[]body)throwsIOException{basicPublish(exchange,routingKey,mandatory,false,props,body);}/**公共API-{@inheritDoc}*/@OverridepublicvoidbasicPublish(Stringexchange,StringroutingKey,booleanmandatory,booleanimmediate,BasicPropertiesprops,byte[]body)throwsIOException{//发布者配置为`Confirm.Select`nextPublishSeqNo设置从1开始//将未确认的消息放入unconfirmedSet,加1if(nextPublishSeqNo>0){unconfirmedSet.add(getNextPublishSeqNo());nextPublishSeqNo++;}基本属性useProps=props;if(props==null){useProps=MessageProperties.MINIMAL_BASIC;}//构造AMQCommand并发送transmit(newAMQCommand(newBasic.Publish.Builder().exchange(exchange).routingKey(routingKey).mandatory(强制).immediate(立即).build(),useProps,body));//用于指标统计和监控,默认为NoOpMetricsCollector,需要配置使用提供的MicrometerMetricsCollector和StandardMetricsCollector(引入相应的包和配置开箱即用~)metricsCollector.basicPublish(this);}"ConstructAMQCommand》值得一提,RabbitMQClient应用消息的最小单位是Frame(帧,在Connection章节提到过),Frame主要由type类型、channel通道、payload消息内容字节、累加器写入数据、NON_BODY_SIZE组成《框架结构》publicclassFrame{/**框架类型代码*///FRAME_HEARTBEAT:心跳,FRAME_METHOD:方法,FRAME_HEADER:头信息,FRAME_BODY内容主体publicfinalinttype;/**帧通道号,0-65535*///通道序号publicfinalintchannel;/**帧有效载荷字节(对于入站帧)*///消息内容字节privatefinalbyte[]payload;/**帧负载(用于出站帧)*///写出数据privatefinalByteArrayOutputStreamaccumulator;privatestaticfinalintNON_BODY_SIZE=1/*type*/+2/*channel*/+4/*payloadsize*/+1/*endcharacter*/;...}AMQP0-9-1特定的“命令”读取,它从一系列帧中累积方法、标题和文本。/***用指定的方法、头和正文构造一个命令。*@parammethod包装的方法*@paramcontentHeader包装的内容头*@parambody消息正文数据*/publicAMQCommand(com.rabbitmq.client.Methodmethod,AMQContentHeadercontentHeader,byte[]body){this.assembler=newCommandAssembler((Method)method,contentHeader,body);}//AMQP0-9-1特定的命令,构造方法、头部和文字publicCommandAssembler(Methodmethod,AMQContentHeadercontentHeader,byte[]body){this.method=method;this.contentHeader=contentHeader;this.bodyN=newArrayList(2);这个.bodyLength=0;this.remainingBodyBytes=0;appendBodyFragment(正文);if(method==null){this.state=CAState.EXPECTING_METHOD;}elseif(contentHeader==null){this.state=method.hasContent()?CAState.EXPECTING_CONTENT_HEADER:CAState.COMPLETE;}else{this.remainingBodyBytes=contentHeader.getBodySize()-this.bodyLength;更新公司意图主体状态();}}"TransmitAMQCommand--Channel.transmit()"publicvoidtransmit(AMQCommandc)throwsIOException{synchronized(_channelMutex){//确认通道是否开启(逻辑比较简单:如果shutdownCause为空,则open)ensureIsOpen();quiescingTransmit(c);}}publicvoidquiescingTransmit(AMQCommandc)throwsIOException{//防止同时使用同一个通道synchronized(_channelMutex){//判断消息是否携带内容,如果有必要,通道是否阻塞(如果通道状态为`FLOW`,则表示blocking_blockContent=true)if(c.getMethod().hasContent()){while(_blockContent){}catch(InterruptedExceptionignored){}//防止通道在被阻塞唤醒时被关闭(多线程操作的好案例)ensureIsOpen();}}c.transmit(this);}}「AMQCommand.transmit”/***将此命令发送到通道的*连接上的指定通道,possibly在多个框架中。*@paramchannel传输命令的通道*@throwsIOException如果出现错误遇到r*/publicvoidtransmit(AMQChannelchannel)throwsIOException{//每个通道都有一个从0开始的序号,(0为特殊通道)intchannelNumber=channel.getChannelNumber();AMQConnectionconnection=channel.getConnection();synchronized(assembler){//方法:FRAME_HEARTBEAT:心跳,FRAME_METHOD:方法,FRAME_HEADER:头信息,FRAME_BODY内容主题方法m=this.assembler.getMethod();如果(m.hasContent()){byte[]body=this.assembler.getContentBody();//FRAME_HEADER:帧头信息FrameheaderFrame=this.assembler.getContentHeader().toFrame(channelNumber,body.length);intframeMax=connection.getFrameMax();booleancappedFrameMax=frameMax>0;intbodyPayloadMax=cappedFrameMax?frameMax-EMPTY_FRAME_SIZE:body.length;if(cappedFrameMax&&headerFrame.size()>frameMax){Stringmsg=String.format("Contentheadersexceededmaxframesize:%d>%d",headerFrame.size(),frameMax);日行新的IllegalArgumentException(味精);}//1.写入channelNumber帧FRAME_METHODconnection.writeFrame(m.toFrame(channelNumber));//2.写入头信息帧AMQP.FRAME_HEADERconnection.writeFrame(headerFrame);//3。如果body太多,会被拆分成多个frameintfragmentLength=(剩余