当前位置: 首页 > 科技观察

4张图,9个维度告诉你如何保证RocketMQ不丢消息

时间:2023-03-18 20:45:05 科技观察

大家好,我是君哥。消息队列的引入可以轻松实现系统解耦、削峰填谷的功能。但是消息队列使用不当可能会导致消息丢失,这在一些对消息敏感的业务场景中是不允许的。今天我们就来说说RocketMQ是如何保证消息不丢失的。1RocketMQ简介RocketMQ是阿里巴巴开源的分布式消息中间件。整体架构如下图所示:RocketMQ主要包括Producer、Consumer、Broker。同时,NameServer进行集群注册管理,保存元数据。2消息不丢失为了保证消息不丢失,需要考虑以下几个方面:Producer发送消息Broker保存消息Consumer消费消息Broker主从切换维度一:同步发送,代码如下:publicvoidsend()throwsException{Stringmessage="testproducer";MessagesendMessage=newMessage("topic1","tag1",message.getBytes());sendMessage.putUserProperty("name1","value1");发送结果sendResult=null;DefaultMQProducer生产者=newDefaultMQProducer("testGroup");producer.setNamesrvAddr("localhost:9876");producer.setRetryTimesWhenSendFailed(3);尝试{sendResult=producer.send(sendMessage);}catch(Exceptione){e.printStackTrace();}if(sendResult!=null){System.out.println(sendResult.getSendStatus());}}同步发送会返回4个状态码:SEND_OK:消息发送成功。需要注意的是,消息发送给broker后,有两个操作:消息刷新和消息同步到slave节点。默认情况下,这两个操作是异步的。只有将这两个操作改为同步,SEND_OK状态才能真正表明发送成功。FLUSH_DISK_TIMEOUT:消息发送成功但消息刷新超时。FLUSH_SLAVE_TIMEOUT:消息发送成功,但在消息同步到从节点时超时。SLAVE_NOT_AVAILABLE:消息发送成功但broker的slave节点不可用。根据返回的状态码,可以对消息进行重试,这里设置的重试次数为3次。消费者在重试消息时,必须做幂等处理。维度二:异步发送,代码如下:publicvoidsendAsync()throwsException{Stringmessage="testproducer";MessagesendMessage=newMessage("topic1","tag1",message.getBytes());sendMessage.putUserProperty("name1","value1");DefaultMQProducer生产者=newDefaultMQProducer("testGroup");producer.setNamesrvAddr("localhost:9876");producer.setRetryTimesWhenSendFailed(3);producer.send(sendMessage,newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){}@OverridepublicvoidonException(Throwablee){//TODO可以在这里添加重试逻辑}});}异步发送,可以重写回调函数,当回调函数捕获到Exception时表示发送失败,此时可以重试。这里设置的重试次数为3次。维度3:刷盘策略异步刷盘:默认。当消息写入CommitLog时,它并没有直接写入磁盘。而是先写入PageCache缓存并返回成功,然后使用后台线程将消息异步刷新到磁盘。异步刷新提高了消息吞吐量,但消息可能会丢失。例如,断点导致机器关闭,PageCache中没有来得及刷新的消息将丢失。同步刷盘:消息写入内存后,立即请求刷盘线程刷盘。如果消息未能在约定时间内(默认5s)刷盘,则返回FLUSH_DISK_TIMEOUT。收到此响应后,Producer可以重试磁盘。尝试。同步刷策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。启用同步刷盘需要增加如下配置:flushDiskType=SYNC_FLUSH维度四:Broker多副本和高可用Broker为了保证高可用,部署了一个master和多个slave。如下图所示:消息发送到主节点后,从节点会从主节点拉取消息,与主节点保持一致。这个过程默认是异步的,即master收到消息后,直接返回success给Producer,不需要等待slave节点复制消息。这样就会有问题。如果从节点还没有完成消息复制,此时主节点宕机,主备倒换后消息会丢失。为了避免这个问题,可以使用slave节点同步复制消息,即等待slave节点复制消息成功,然后返回给Producer消息发送成功。只需添加如下配置:brokerRole=SYNC_MASTER改为同步复制后,消息复制过程如下:slave初始化后,与master建立连接,将自己的offset发送给master;消息分批发送给slave;slave将接收到的消息写入commitLog文件,并向master发送一个新的offset;master收到新的offset后,如果offset>=producer发送消息后的offset,则返回SEND_OK给producer。维度5:消息确认Consumer消费消息代码如下:consumer.setNamesrvAddr("localhost:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("topic1","tag1");consumer.registerMessageListener((MessageListenerConcurrently)(msgs,context)->{try{System.out.printf("接收新消息:%s",msgs);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch(Exceptione){e.printStackTrace();返回ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();}如果Consumer消费成功,返回CONSUME_SUCCESS,提交offset,从Broker批量消息中拉取下一条。维度6:Consumer重试Consumer消费失败,这里有3种情况:returnRECONSUME_LATER返回null并抛出异常Broker收到这个响应后,会把这个消息放入重试队列,重新发送给Consumer。注意:代理默认最多重试16次。如果16次重试失败,消息会被放入死信队列,Consumer可以订阅死信队列进行消费。重试只在集群模式(MessageModel.CLUSTERING)下生效,在广播模式下(MessageModel.BROADCASTING)不生效。消费者端必须是幂等的。其实3次重试失败就说明代码有问题。此时Consumer可以将消息存储到本地,返回CONSUME_SUCCESS给Broker结束重试。代码如下:intcount=((MessageExt)msg??s).getReconsumeTimes();if(count>2){//TODO将消息写入本地存储System.out.println("重试次数超过3次");returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}维度七:事务性消息RocketMQ支持事务性消息,总体流程如下:Producer发送一半消息;Broker先将消息写入主题为RMQ_SYS_TRANS_HALF_TOPIC的队列,然后返回成功给Producer;Producer执行本地事务,成功后向Broker发送commit命令(若本地事务执行失败则发送回滚);Broker收到commit请求后,将消息状态修改为success,将消息推送到真实topic;消费者拉取消息进行消费。代码如下:publicclassProducerTransactionListenerImplimplementsTransactionListener{@OverridepublicLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){/***这里执行本地事务,执行成功返回LocalTransactionState.COMMIT_MESSAGE,返回*LocalTransactionState。ROLLBACK_MESSAGE如果执行失败UNKNOW,*Broker会回来查询,所以需要记录事务执行状态*/returnLocalTransactionState.COMMIT_MESSAGE;}@OverridepublicLocalTransactionStatecheckLocalTransaction(MessageExtmsg){/***这里查询交易执行状态,返回LocalTransactionState.COMMIT_MESSAGE或*LocalTransactionState.ROLLBACK_MESSAGE,如果没有查询返回LocalTransactionState.UNKNOW,*Broker会重新查询,可以记录查询次数,超过次数后返回ROLLBACK_MESSAGE*/returnLocalTransactionState.UNKNOW;}}维度8:消息索引我们知道RocketMQ的核心有3个数据文件:CommitLog、ConsumeQueue和Index。Index文件是一个索引文件,其结构如下:查找消息时,首先根据消息key的hashcode计算出Hash槽的位置,然后读取Hash槽的值计算位置Index条目的位置,并从Index条目的位置读取消息CommitLog文件中的偏移量以查找消息。Producer发送消息时,可以指定一个key,代码如下:MessagesendMessage=newMessage("topic1","tag1",message.getBytes());sendMessage.setKeys("weiyiid");可以通过RocketMQ命令或管理控制台提供,检查消息是否发送成功。维度九:极端情况如果对消息丢失零容忍,就要考虑极端情况,比如整个RocketMQ集群挂掉,此时producer端会发送消息失败。可以考虑将producer端降级,将要发送的消息保存到本地数据库或磁盘中,待RocketMQ恢复后将本地消息推送出去。3小结在一些特殊的业务场景,比如支付,银行记账等,需要保证消息不丢失,但同时也要注意,消息不丢失的解决方案会大大降低RocketMQ的吞吐量,需要综合考虑。

最新推荐
猜你喜欢