当前位置: 首页 > 科技观察

警报!这8个场景,RocketMQ都会有流控

时间:2023-03-21 10:23:43 科技观察

大家好,我是君哥。在使用RocketMQ的过程中,有时会看到如下日志:[TIMEOUT_CLEAN_QUEUE]brokerbusy,startflowcontrolforawhile,periodinqueue:206ms,sizeofqueue:5这是因为RocketMQ触发了流控。今天我们就来说说RocketMQ在哪些场景下会触发流控。如上图所示,生产者将消息写入Broker,Consumer从Broker拉取消息。Broker是RocketMQ的核心,触发流控主要是防止Broker压力过大而宕机。一、Broker流控1、BrokerbusyRockerMQ默认采用异步flush策略。Producer将消息发送给Broker后,Broker会先将消息写入PageCache,刷新线程会周期性的将PageCache中的数据刷新到磁盘,如下图:brokerbusy是怎么引起的?Broker默认开启快速失败,处理逻辑类为BrokerFastFailure。该类中有一个定时任务清理过期请求,每10ms执行一次。代码如下:},1000,10,TimeUnit.MILLISECONDS);}(1)PageCache忙。在清除过期请求之前,它会先判断PageCache是??否繁忙。如果忙,则返回一个系统忙状态码给Producer(code=2,remark="[PCBUSY_CLEAN_QUEUE]brokerbusy,开始流控一段时间,periodinqueue:%sms,queuesize:%d"),也就是本文开头的异常日志。那么如何判断PageCache忙呢?Broker收到消息后,会将其追加到PageCache或内存映射文件中。这个进程首先获得一个CommitLog写锁。如果锁的持有时间大于osPageCacheBusyTimeOutMills(默认1s,可配置),则认为PageCache忙。具体代码见DefaultMessageStore类的isOSPageCacheBusy方法。(2)清除过期请求清除过期请求时,如果请求线程的创建时间与当前系统的时间间隔大于waitTimeMillsInSendQueue(默认200ms,可配置),请求将被清除,并出现系统忙状态码(code=2,remark="[TIMEOUT_CLEAN_QUEUE]brokerbusy,开始流量控制一段时间,queueinperiod:%sms,queuesize:%d").系统繁忙异常在NettyRemotingAbstract#processRequestCommand方法中。拒绝请求如果NettyRequestProcessor拒绝了请求,它会返回一个系统忙状态码(code=2,remark="[REJECTREQUEST]systembusy,startflowcontrolforawhile")给Producer。什么情况下请求会被拒绝?看下面的代码://SendMessageProcessorclasspublicbooleanrejectRequest(){returnthis.brokerController.getMessageStore().isOSPageCacheBusy()||this.brokerController.getMessageStore().isTransientStorePoolDeficient();}从代码中可以看出,请求被拒绝有两种可能,一种是PageCache忙,另一种是TransientStorePoolDeficient。跟踪isTransientStorePoolDeficient方法,发现判断的依据是开启transientStorePoolEnable配置时是否有可用的ByteBuffer。注意:开启transientStorePoolEnable后,写入消息时,会先写入堆外内存(DirectByteBuffer),然后刷入PageCache,最后刷入磁盘。消息的读取是从PageCache中读取的,可以实现读写分离,避免在PageCache中同时读写带来的问题。如下图所示:线程池拒绝Broker后,收到请求后,会将处理逻辑封装成一个Runnable,由线程池提交执行。如果线程池满了,会拒绝请求(这里线程池中队列的默认大小是10000,可以通过参数sendThreadPoolQueueCapacity配置),线程池拒绝后,会抛出异常RejectedExecutionException,程序捕获到异常后会判断是否是单向请求(OnewayRPC),如果不是,会返回一个systembusy状态码给Producer(code=2,remark="[OVERLOAD]systembusy,开始流量控制一段时间").判断OnewayRPC的代码如下。当flag=2或3时,是单向请求:publicbooleanisOnewayRPC(){intbits=1<retryResponseCodes=newCopyOnWriteArraySet(Arrays.asList(ResponseCode.TOPIC_NOT_EXIST,ResponseCode.SERVICE_NOT_AVAILABLE,ResponseCode.SYSTEM_NOPERModeOR,ResponseISCodeSI.NO_BUYER_ID,ResponseCode.NOT_IN_CURRENT_UNIT));2.Consumer流控DefaultMQPushConsumerImpl类有Consumer流控的逻辑。1、缓存消息数超过阈值。ProcessQueue保存的消息数超过阈值(默认1000条,可配置)。源码如下:if(cachedMessageCount>this.defaultMQPushConsumer.getPullThresholdForQueue()){this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL;if(if)queueFlowControlTimes++%1000)==0){log.warn("缓存消息计数超过阈值{},所以做流量控制,minOffset={},maxOffset={},count={},size={}MiB,pullRequest={},flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(),cachedMessageCount,cachedMessageSizeInMiB,pullRequest,queueControl};;}2.缓存消息的大小超过阈值保存的消息大小通过ProcessQueue超过阈值(默认100M,可配置),源码如下:lowControlTimes++%1000)==0){log.warn("缓存消息大小超过阈值{}MiB,所以做流控,minOffset={},maxOffset={},count={},size={}MiB,pullRequest={},flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(),cachedMessageCount,cachedMessageSizeInMiB,pullRequest)Control)返回;}3.缓存消息的span超过阈值对于非顺序消费场景,ProcessQueue中保存的最后一条消息和第一条消息的offset差值超过阈值(默认2000,可配置)源码如下:if((queueMax++0Time0%1Controls){log.warn("队列的消息,跨度太长,所以做流控,minOffset={},maxOffset={},maxSpan={},pullRequest={},flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(),processQueue.getMaxSpan(),pullRequest,queueMaxSpanFlowControlTimes);}return;}}4.获取锁失败顺序消费,如果ProcessQueue加锁失败,也会延迟拉取Taken,延迟时间默认3s,可配置。3.总结本文介绍了RocketMQ流控的8个场景,其中Broker场景4个,Consumer场景4个。Broker的流控本质是Producer的流控,最好的解决方案是扩容Broker的容量,增加Broker的写入能力,至于consumer端的流控,需要解决consumer上消费慢的问题端,比如第三方接口响应慢或者SQL慢.使用时,根据打印的日志,可以分析流量控制的具体情况,采取相应的措施。