大家好,我是君哥。在使用RocketMQ的过程中,有时会看到如下日志:[TIMEOUT_CLEAN_QUEUE]brokerbusy,startflowcontrolforawhile,periodinqueue:206ms,sizeofqueue:5这是因为RocketMQ触发了流控。今天我们就来说说RocketMQ在哪些场景下会触发流控。如上图所示,生产者将消息写入Broker,Consumer从Broker拉取消息。Broker是RocketMQ的核心,触发流控主要是防止Broker压力过大而宕机。一、Broker流控1、BrokerbusyRockerMQ默认采用异步flush策略。Producer将消息发送给Broker后,Broker会先将消息写入PageCache,刷新线程会周期性的将PageCache中的数据刷新到磁盘,如下图:brokerbusy是怎么引起的?Broker默认开启快速失败,处理逻辑类为BrokerFastFailure。该类中有一个定时任务清理过期请求,每10ms执行一次。代码如下:},1000,10,TimeUnit.MILLISECONDS);}(1)PageCache忙。在清除过期请求之前,它会先判断PageCache是??否繁忙。如果忙,则返回一个系统忙状态码给Producer(code=2,remark="[PCBUSY_CLEAN_QUEUE]brokerbusy,开始流控一段时间,periodinqueue:%sms,queuesize:%d"),也就是本文开头的异常日志。那么如何判断PageCache忙呢?Broker收到消息后,会将其追加到PageCache或内存映射文件中。这个进程首先获得一个CommitLog写锁。如果锁的持有时间大于osPageCacheBusyTimeOutMills(默认1s,可配置),则认为PageCache忙。具体代码见DefaultMessageStore类的isOSPageCacheBusy方法。(2)清除过期请求清除过期请求时,如果请求线程的创建时间与当前系统的时间间隔大于waitTimeMillsInSendQueue(默认200ms,可配置),请求将被清除,并出现系统忙状态码(code=2,remark="[TIMEOUT_CLEAN_QUEUE]brokerbusy,开始流量控制一段时间,queueinperiod:%sms,queuesize:%d").系统繁忙异常在NettyRemotingAbstract#processRequestCommand方法中。拒绝请求如果NettyRequestProcessor拒绝了请求,它会返回一个系统忙状态码(code=2,remark="[REJECTREQUEST]systembusy,startflowcontrolforawhile")给Producer。什么情况下请求会被拒绝?看下面的代码://SendMessageProcessorclasspublicbooleanrejectRequest(){returnthis.brokerController.getMessageStore().isOSPageCacheBusy()||this.brokerController.getMessageStore().isTransientStorePoolDeficient();}从代码中可以看出,请求被拒绝有两种可能,一种是PageCache忙,另一种是TransientStorePoolDeficient。跟踪isTransientStorePoolDeficient方法,发现判断的依据是开启transientStorePoolEnable配置时是否有可用的ByteBuffer。注意:开启transientStorePoolEnable后,写入消息时,会先写入堆外内存(DirectByteBuffer),然后刷入PageCache,最后刷入磁盘。消息的读取是从PageCache中读取的,可以实现读写分离,避免在PageCache中同时读写带来的问题。如下图所示:线程池拒绝Broker后,收到请求后,会将处理逻辑封装成一个Runnable,由线程池提交执行。如果线程池满了,会拒绝请求(这里线程池中队列的默认大小是10000,可以通过参数sendThreadPoolQueueCapacity配置),线程池拒绝后,会抛出异常RejectedExecutionException,程序捕获到异常后会判断是否是单向请求(OnewayRPC),如果不是,会返回一个systembusy状态码给Producer(code=2,remark="[OVERLOAD]systembusy,开始流量控制一段时间").判断OnewayRPC的代码如下。当flag=2或3时,是单向请求:publicbooleanisOnewayRPC(){intbits=1<
