前言接续之前分享的RabbitMQ客户端源码系列-Connection,继续分享Channel相关的源码解析(com.rabbitmq:amqp-client:4.8.3)。友情提示:本次分享适合需要对RabbitMQChannels有一定了解的人阅读https://www.rabbitmq.com/channels.html。RabbitMQ客户端Demo在上一个Java客户端连接RabbitMQDemo的基础上继续分析RabbitMQChannel。ConnectionFactoryfactory=newConnectionFactory();//默认为"guest"/"guest",仅限于localhost连接factory.setUsername(userName);factory.setPassword(password);factory.setVirtualHost(virtualHost);factory.setHost(hostName);factory.setPort(portNumber);Connectionconn=factory.newConnection();//这次分析的重点是Channelchannel=connection.createChannel();byte[]messageBodyBytes="Hello,world!".getBytes();channel.basicPublish(EXCHANGE_NAME,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);channel.close();connection.close();AMQP协议交互--Channel可以看到,简单调用Channelchannel=connection.createChannel();方法创建Channel,可以看到Channel对应的AMQP协议交互:》客户端发送Channel.Open,broker收到后返回Channel.OpenOk(客户端创建通道;broker接收并完成通道创建)”。整个AMQP协议交互过程(172.30.0.74为客户端本地ip;192.168.17.160为RabbitMQBroker的ip)RabbitMQ客户端缓存方式为Channel本次分析RabbitMQ客户端采用的缓存方式为Channel:一个Connection对应多个Channel(默认有2048个通道,其中一个是特殊的channel0)“Connection”:主要用于AMQP协议解析和通道复用。“渠道”:路由、安全、协调。“Queue”:内存中的消息,永久的队列索引(也有通道和队列之间的交换作为开关,这里不讨论)。RabbitMQ客户端CacheMode为Channel模式通道源码分析以上简单介绍了AMQP协议交互过程中Channel连接、Connection与Channel的关系。开头,这次主要介绍Channel和Connection相关的源码,从connection.createChannel开始深入分析。/**公共API-{@inheritDoc}*/@OverridepublicChannelcreateChannel()throwsIOException{//确认连接已打开ensureIsOpen();//管理频道ChannelManagercm=_channelManager;如果(cm==null)返回null;//创建通道核心方法Channelchannel=cm.createChannel(this);//用于暴露metricsCollector.newChannel(channel);返回通道;可以看到通道是通过连接调用和管理的:ensureIsOpen()--确认连接打开,判断shutdownCause为空逻辑比较简单(如果连接关闭,shutdownCause会也表明它是关闭的)。channelManager--由connection统一初始化和管理,初始化在前面的connection和broker创建交互(Connection.Tune-->Connection.TuneOk)中完成,默认ChannelMax为2047(2048-1,这个1对应一个特殊频道0)。关注channelManager.createChannel(this)逻辑。publicChannelNcreateChannel(AMQConnectionconnection)throwsIOException{ChannelNch;//monitor主要监听_channelMap和channelNumberAllocatorsynchronized(this.monitor){//获取channel分配的numberintchannelNumber=channelNumberAllocator.allocate();如果(通道号==-1){返回空值;}else{//添加一个新频道ch=addNewChannel(connection,channelNumber);}}//打开新添加的通道ch.open();//现在它已被安全添加returnch;}channelManager管理通道创建、连接释放等:synchronized(this.monitor)--首先获取channelManager的监听锁,防止多线程并发操作。channelNumberAllocator.allocate--获取范围内未分配的channelNumber,如果返回-1,表示不能分配新的channel,主要内部逻辑由BitSet实现(有兴趣的可以详细了解)).后续会重点分析addNewChannel和开启逻辑。privateChannelNaddNewChannel(AMQConnectionconnection,intchannelNumber){//判重if(_channelMap.containsKey(channelNumber)){//这个号码已经分配了!做不到//除非我们的实现出现严重错误,否则这永远不会发生。thrownewIllegalStateException("我们试图"+"创建一个频道,其中的号码已经在"+"使用中。这永远不会发生。"+"请将此作为错误报告。");}//构建ChannelNch=instantiateChannel(connection,channelNumber,this.workService);//放入_channelMap统一管理_channelMap.put(ch.getChannelNumber(),ch);返回ch;}publicChannelN(AMQConnectionconnection,intchannelNumber,ConsumerWorkServiceworkService,MetricsCollectormetricsCollector){//AMQChannel构造函数super(connection,频道号);//构建消费者分发器this.dispatcher=newConsumerDispatcher(connection,this,workService);this.metricsCollector=metricsCollector;}这段逻辑比较简单,执行instantiateChannel建立和初始化通道,主要涉及连接数和通道数,超时,dispatcher等,每个通道都有一个dispatcher,但是“连接和线程池”是共享的同样的连接,最后得到新创建的通道,打开ch.open()。publicvoidopen()throwsIOException{//发送Channel.Open给rabbitmqbroker,等待broker返回Channel.OpenOkexnWrappingRpc(newChannel.Open(UNSPECIFIED_OUT_OF_BAND));}publicAMQCommandexnWrappingRpc(Methodm)trythrowsIOException{//对该方法进行rpc调用returnprivateRpc(m);}catch(AlreadyClosedExceptionace){//不要包装它,因为它意味着连接/通道//在过去的某个操作中被关闭throwace;}catch(ShutdownSignalExceptionex){throwwrap(ex);}}privateAMQCommandprivateRpc(Methodm)throwsIOException,ShutdownSignalException{//用于rpc调用期间的阻塞和等待SimpleBlockingRpcContinuationk=newSimpleBlockingRpcContinuation(m);rpc(m,k);//等待无超时if(_rpcTimeout==NO_RPC_TIMEOUT){returnk.getReply();}否则{尝试{//等待返回超时k.getReply(_rpcTimeout);}catch(TimeoutExceptione){throwwrapTimeoutException(m,e);}}}开通新通道的逻辑比较简单:主要是与rabbitmqbroker进行rpc调用:"client发送Channel.Open,broker收到后返回Channel.OpenOk。处理完成后,通道为创建完成,可以使用后续通道。”最后,这次我将根据通道源码,按照AMQP协议,分享RabbitMQClient与RabbitMQBroker的交互过程分析,有兴趣的读者可以详细了解通道源码。
