rocketmq是阿里开源的一个强大的消息队列。很多公司都在用,经历过多次双十一的洗礼。它支持多种功能。我不知道你对这项技术了解多少。消息队列现在应该是公司必备的技能之一,不管是RabbitMQ还是rocketmq,还是支持大量的kafka。今天我们就来说说rocketmq是如何保证消息不丢失的?不知道你有没有遇到过这个问题,或者你听到这个问题的第一反应是什么,应该怎么办,如何避免信息丢失。让我们来看看。首先,我们知道一条rocketmq消息从生产到最终消费一共需要经历三个阶段,或者说会经过三个地方,分别是producer的sender,broker的持久化机制,以及消费者的消费者。从producer生产者的角度来看:消息生产出来后,传递给broker。如果消息没有正确存储在broker中,则认为从broker的角度来看:消息默认保存在broker的内存中,异步保存到磁盘中。如果发生停机或磁盘崩溃,消息将丢失。从消费者角度看:消息持久化后,消费者拉取后,消费失败,没有反馈给broker。这被计为消息丢失。可能是消费过程异常或者网络抖动导致消息丢失。从生产者的角度来看:消费和生产后,交付给经纪人。如果消息没有正确保存在broker中,算作消息丢失从生产者的角度来看,消息是生产出来的,通过网络发送给broker。事实上,只需要做一件事就是确认消息已经成功发送给代理。生产者只需要收到发送消息返回的确认响应即可。是的,表示消息发送成功代码示例:DefaultMQProducermqProducer=newDefaultMQProducer("test");//设置nameSpace地址mqProducer.setNamesrvAddr("namesrvAddr");mqProducer.start();Messagemsg=newMessage("topic"/*Topic*/,"Captain".getBytes(RemotingHelper.DEFAULT_CHARSET)/*Messagebody*/);//发送消息给Brokertry{SendResultsendResult=mqProducer.send(msg);}catch(Exceptione){e.printStackTrace();}当然发送消息有同步和异步两种。消息发送成功后,会返回以下四种不同的响应状态SEND_OK消息发送成功,但不代表消息不会丢失。它还取决于代理刷新磁盘的方式。下面会在broker端说需要启动SYNC_MASTER或者SYNC_FLUSH。(即同步)FLUSH_DISK_TIMEOUT如果Broker设置了MessageStoreConfig的FlushDiskType=SYNC_FLUSH(默认为ASYNC_FLUSH),Broker在MessageStoreConfig的syncFlushTimeout(默认为5秒)内没有完成刷盘,这个状态表示此时磁盘刷新超时。FLUSH_SLAVE_TIMEOUT如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),并且slaveBroker在MessageStoreConfig的syncFlushTimeout(默认5秒)内还没有完成与master的通信。服务器的同步返回这个结果。主从同步时间默认也是5秒,需要完成主从同步。下面说到经纪商的时候也会提到这个。想一想,万一master挂了,或者磁盘挂了,这样行吗?也不能100%保证消息不会丢失。SLAVE_NOT_AVAILABLE如果Broker的角色是SYNC_MASTER(默认是ASYNC_MASTER),但是没有配置slaveBroker,返回这个状态。在消息永不丢失的情况下,不允许存在这种状态。这个状态也是需要处理的。没有可靠的slave,就意味着没有可靠的备份数据,所以这种情况也需要考虑。另外,上面还提到了一种异步消息发送方式,一般用于链接比较长的情况,对于这种情况下时间敏感的业务,需要特别注意的是,我们需要设置完成回调消息发送,更好的保证消息不会丢失。事务性消息的投递方式不保证消息100%投递成功到达Broker,但是如果消息发送Ack失败,消息会被存入CommitLog,但是对于Consumer队列是不可见的,您可以在日志中查看此异常消息。严格来说broker并没有完全丢失:消息默认保存在broker的内存中,异步保存到磁盘。如果发生宕机,磁盘崩溃会导致消息丢失。顺序消费不是特别常见的场景,但是也是必不可少的,因为在一些业务场景中,顺序很重要,保证消息消费的顺序也很关键。消息到达broker后,默认是先保存在broker的内存中,然后立即将response返回给producer,然后broker自己周期性的将消息批量异步保存到硬盘中。如果没有保存到硬盘,直接返回,broker直接崩溃崩溃,消息无处可寻。这样的好处是提高交互效率,减少IO次数。问题是消息会丢失。如果我们要保证消息不丢失,就需要保证消息成功保存到broker后能够返回。只需要修改消息保存机制为同步刷盘的方式,即只有消息成功保存到broker的磁盘后,才会返回Response##默认为ASYNC_FLUSHflushDiskType=SYNC_FLUSH如果broker失败在指定的同步时间内(默认5秒)完成刷盘,会返回FLUSH_DISK_TIMEOUT给生产者。这个上面也有介绍。系统中一般使用FLUSH_DISK_TIMEOUT来保证可用性。Broker通常采用一主多从的部署方式。属于集群部署。为了保证消息不丢失,需要将消息复制到从节点。其实默认情况下,消息写入broker后,会返回successbut!如果master突然死机或磁盘崩溃,消息将完全丢失,没有备份。所以需要把master和slave的异步复制改为同步复制##master节点配置flushDiskType=SYNC_FLUSHbrokerRole=SYNC_MASTER##slave节点配置brokerRole=slaveflushDiskType=SYNC_FLUSH表示只有slave刷盘成功后,它将成功返回给生产者。当然这里要说的话,master和slave也有可能同时宕机,磁盘同时崩溃,最后还是不尽如人意。100%保证消息不丢失的问题,其实就像TCP的三个交互3次交互之后,客户端和服务端的通信就一定成功了吗?答案不一定。我们只能在有限的资源下尽量满足系统的稳定性。可以成功消费,不反馈给broker。这被认为是丢失的消息。可能是消费过程异常或者网络抖动导致消息丢失。消费者从broker那里拉取消息,然后消费对应的业务。如果消费成功,则返回消费成功状态。给broker,如果broker没有收到确认消息,consumer会在下次再次拉取消息//注册回调实现类来处理从broker拉回来的消息
