使用rabbitmq如何保证消息不丢失?其实这个问题适用于任何mq。虽然不同的mq在具体操作上存在差异,但一般需要从三个方面来考虑:producer发送、broker存储、consumer接收都保证了消息的可靠性。我们来看看rabbitmq是如何做到这三点的。1.生产者消息可靠性生产者保证消息可靠性有两种方式,事务和确认机制。1、事务控制rabbitmq允许通过事务来发送消息。关键代码如下://==设置通道为事务模式channel.txSelecttry{//在这里发送消息}catch(Exceptione){//==事务回滚channel.txRollback}//==提交事务通道.txCommit如果有问题,可以回滚事务,然后再做重发等操作。问题是事务是同步的,后面的操作会等待事务执行完阻塞在这里,影响性能。2.确认机制singleconfirm//==通道开启消息确认channel.confirmSelect();//单发channel.basicPublish("",QUEUE_NAME,null,message.getBytes());if(channel.waitForConfirms()){//消息发送成功}else{//消息发送失败}每条数据通过channel.waitForConfirms()发送获取发送结果batchconfirm//==channelopeningmessageconfirmationchannel.confirmSelect();//batchsendingfor(inti=0;i<10;i++){channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}//直到所有信息发布完毕,只要有未确认的IOExceptionchannel.waitForConfirmsOrDie();一个batch通过channel.waitForConfirmsOrDie()获取发送结果。如果一个批次发送失败,则需要重新发送整个批次;消费者需要处理重复的消息,一般在业务层面是幂等的。前两种异步确认是同步确认,异步确认可以最大化效率。channel.addConfirmListener(newConfirmListener(){//==消息失败处理@OverridepublicvoidhandleNack(longdeliveryTag,booleanmultiple)throwsIOException{//deliveryTag;唯一的消息标签//multiple:是否批量处理System.err.println("--------noack!------------");}//==消息成功处理@OverridepublicvoidhandleAck(longdeliveryTag,booleanmultiple)throwsIOException{System.err.println("--------ack!------------");}});不管成功还是失败,监听器中的一个方法都会被回调。2、broker消息的可靠持久化首先要开启持久化,持久化分为队列持久化和消息持久化。队列持久化:创建队列时将channel.queueDeclare()的第二个参数改为true。消息持久化:使用channel.basicPublish()发送消息时,将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN,表示消息持久化。同时启用上面两种持久化机制,这样当消息到达broker时,rabbitmq会进行磁盘操作。但是rabbitmq消息放到磁盘上也有一个小的时间间隔,可能刚写入系统缓存服务器就挂掉了。所以必须开启rabbitmq的镜像集群,数据会在写入master的同时同步到slave服务。3、消费者消息的可靠消息确认模式:AcknowledgeMode.NONE:自动确认(默认)。AcknowledgeMode.AUTO:根据情况确认。AcknowledgeMode.MANUAL:手动确认。当manualackconsumer成功收到消息后,默认会自动向broker发送一个ack,表示消息已经收到。我们需要改为手动确认模式。一张图总结了上面的讨论,消息的可靠性需要牺牲一定的性能,从而降低mq的吞吐量,所以需要在可靠性和吞吐量之间找到一个平衡点。想一想:现在的业务场景对消息的可靠性有这么高的要求,同时还要保证实时性吗?如果实时性要求不高,是否可以通过生产消费两端的校验来满足可靠性要求?附录P6-P7知识集锦
