当前位置: 首页 > 后端技术 > Java

RocketMQ学习十三——顺序消息、延迟消息和消息过滤

时间:2023-04-02 00:55:41 Java

如题所示,本文围绕顺序消息、延迟消息和消息过滤展开。1、顺序消息RocketMQ只能在队列层面保证消息的顺序。如果要实现某类消息的顺序执行,就必须将这类消息发送到同一个队列中。发送消息时可以使用MessageQueueSelector,通过指定shardingkey,然后将同类型的消息发送到同一个队列,使得CommitLog文件中消息的顺序与发送时的顺序一致。Broker端选择队列可以参考之前的文章:RocketMQ学习五——选择队列等特性。接下来说一下消费者端的处理。顺序消息消费的事件监听器是MessageListenerOrderly。我们知道,PullMessageService是根据offset拉取一批消息,存储到ProcessQueue中,然后使用线程池进行处理。为保证消费端顺序处理单个队列中的消息,在多线程场景下需要根据消息消费队列进行加锁。消费端顺序消费的并发度不依赖于消费端线程池的大小,而是依赖于分配给消费者的队列数量。因此,如果一个topic用于顺序消费场景,建议增加consumer的队列数,可以适当的增加到非顺序消费的2~3倍,有利于提高consumer的并发度并促进横向扩展。consumer端的水平扩展或者Broker端队列数量的变化都会触发消息消费队列的重新加载。在并发消费时,一个消费队列可能会被多个消费者同时消费,但是在顺序消费时不会出现这种情况,因为顺序消息在消费消息时不仅要锁住消息消费队列,还需要申请锁Broker端消费队列分配到消息队列时,从队列中拉取消息,即同一时间拉取消息只能有一个消费者获取队列中的消息,保证顺序消费的语义。流程:PullMessageService单线程从Broker获取消息。PullMessageService向ProcessQueue中添加消息(ProcessMessage是消息的缓存),然后向ConsumeMessageOrderService提交一个消费任务。拿到锁后,从ProcessQueue中获取消息。顺序消费时消费失败怎么办?并发消费模式有消费失败时的重试机制。默认重试16次,重试时先将消息发送给Broker,然后再拉取消息。对于顺序消息,这种机制将失去其消费的顺序性。另外,如果一条消息不能消费成功,它会不断重试(准确的说是Integer.MAX_VALUE次),直到消费成功。如果一直失败,消息的消息消费进度就无法往前推进,就会造成消息的积压,所以我们必须在顺序消费的时候捕获异常。2.延迟消息在RocketMQ开源版本中,延迟消息不支持任何时间延迟。目前默认设置为:1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h,1s到2h分别对应1到18级,阿里云付费版支持40内任意时刻天(毫秒级别)。延迟消息流程图:Producer设置自己发送的消息需要延迟的级别(例如设置延迟级别为3:message.setDelayTimeLevel(3))。Broker发现这条消息是延迟消息(消息的delayLevel大于0),将Topic替换为延迟的Topic(SCHEDULE_TOPIC_XXXX)。每个延迟级别将用作单独的队列(delayLevel-1),并且其自己的Topic将用作附加信息存储(在CommitLog#putMessage方法中)。构建一个ConsumerQueue定时任务,每隔1s扫描一次各个延迟级别的ConsumerQueue。获取ConsumerQueue中CommitLog的Offset,获取消息,判断是否到了执行时间。如果是,恢复消息的主题并重新投递。如果未达到,则任务的执行将延迟未达到的时间段。在延迟消息的内容中,我们会提到消费失败的情况。消费者消费消息后,需要回复broker消息的消费状态。有两种消费状态。consume_success表示消费成功,reconsume_later表示稍后恢复消费。消费是如何变现的?答案是,失败的消息会被放入延迟队列(主题为SCHEDULE_TOPIC_XXXX,每消费失败一次,级别加1)。当broker启动时,会启动ScheduleMessageService定时任务。它的作用是处理延迟队列中的消息。每个队列都有一个特殊的Timer计时器来轮询其中的消息。如果消息拉取后发现时间到了,就会存储到CommitLog中,供消费者消费;如果时间未到,它将被忽略。如果超过最大重试次数,就会进入死信队列。详见RocketMq消费失败处理逻辑3、消息过滤RocketMQ支持SQL过滤和TAG过滤。SQL过滤:在broker端执行,可以减少无用数据的网络传输,但是对broker的压力会很大,性能会很低。它支持使用SQL语句的复杂过滤逻辑。TAG过滤:在broker和consumer端进行,增加无用数据的网络传输,但是broker压力小,性能高。仅支持简单过滤。SQL过滤先不分析,可以参考文章:RocketMQ源码分析:消息过滤是如何实现的?TAG过滤的过程大致是broker在对应的ConsuemrQueue中获取hashcode(tag),根据consumer传入的tag进行比较。如果不匹配,则跳过该消息;如果匹配,消费者需要再次比较标签,因为可能存在哈希冲突。broker端的过滤://查询消息入口publicGetMessageResultgetMessage(finalStringgroup,finalStringtopic,finalintqueueId,finallongoffset,finalintmaxMsgNums,finalMessageFiltermessageFilter){//tag过滤,在consumerQueue里if(messageFilter!=null&&!messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal?tagsCode:null,extRet?cqExtUnit:null)){if(getResult.getBufferTotalSize()==0){status=GetMessageStatus.NO_MATCHED_MESSAGE;}继续;}//tag过滤,在commitlog里if(messageFilter!=null&&!messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(),null)){if(getResult.getBufferTotalSize()==0){status=GetMessageStatus.NO_MATCHED_MESSAGE;}//释放...selectResult.release();继续;}}消费者过筛:publicPullResultprocessPullResult(finalMessageQueuemq,finalPullResultpullResult,finalSubscriptionDatasubscriptionData){PullResultExtpullResultExt=(PullResultExt)pullResult;this.updatePullFromWhichNode(mq,pullResultExt.getSuggestWhichBrokerId());if(PullStatus.FOUND==pullResult.getPullStatus()){ByteBufferbyteBuffer=ByteBuffer.wrap(pullResultExt.getMessageBinary());ListmsgList=MessageDecoder.decodes(byteBuffer);ListmsgListFilterAgain=msgList;如果(!subscriptionData.getTagsSet().isEmpty()&&!subscriptionData.isClassFilterMode()){msgListFilterAgain=newArrayList(msgList.size());对于(MessageExtmsg:msgList){if(msg.getTags()!=null){if(subscriptionData.getTagsSet().contains(msg.getTags())){msgListFilterAga添加(味精);}}}}if(this.hasHook()){FilterMessageContextfilterMessageContext=newFilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);this.executeHook(filterMessageContext);......}pullResultExt.setMessageBinary(null);返回拉取结果;}消息过滤也可以通过topic来实现,我们是使用topic进行过滤还是使用tag过滤可以根据具体的业务场景来选择。一般来说,不同Topic之间的消息之间没有必然联系,Tag是用来区分同一Topic下相互关联的消息。参考文章序列消息参考:13结合实际场景序列消费和消息过滤聊聊序列消息(RocketMQ序列消息实现机制)过滤原理

最新推荐
猜你喜欢