@[toc]微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件。从这个角度来说,消息中间件在互联网应用开发中确实是重要起来了。今天以RabbitMQ为例,宋哥就来和大家聊一聊消息中途发送消息的可靠性。注意,在后面的内容中,我主要和大家讨论如何保证消息生产者发送消息成功,不涉及消息消费的问题。1、RabbitMQ的消息发送机制众所周知,RabbitMQ中的消息发送引入了Exchange(开关)的概念。消息先发送到交换机,然后交换机根据既定的路由规则,将消息路由到不同的队列(queue)),然后被不同的消费者消费。大致流程是这样的,所以为了保证消息发送的可靠性,我们主要从两个方面进行确认:消息是否成功到达和Queue消息是否成功到达。如果我们能够确认这两个步骤,那么我们就可以认为消息发送成功了。如果这两个步骤中的任何一个出现问题,则消息没有成功传递。这个时候,我们可能要通过重试来重新发送消息。多次重试后,如果消息仍然无法到达,可能需要人为干预。经过上面的分析,我们可以确认,要保证消息发送成功,我们只需要做三件事:确认消息到达Exchange。确认到达队列。启动定时任务,定时投递发送失败的消息。2、RabbitMQ的努力以上提出的三个步骤,第三步需要自己去实现,RabbitMQ对于前两个步骤都有现成的解决方案。如何保证消息成功到达RabbitMQ?RabbitMQ提供了两种解决方案:启用事务机制和发送方确认机制。这是两种不同的解决方案。它们不能同时启用,但只能选择其中之一。如果同时启用两者,会报如下错误:Let'sdoitseparatelylook。以下所有案例均在SpringBoot中进行,相关源码可在文末下载。2.1开启事务机制在SpringBoot中开启RabbitMQ事务机制的方式如下:首先需要提供一个事务管理器,如下:@BeanRabbitTransactionManagertransactionManager(ConnectionFactoryconnectionFactory){returnnewRabbitTransactionManager(connectionFactory);}下一步,在消息生产者上做两件事:添加事务注解和设置通信通道为事务模式:@ServicepublicclassMsgService{@AutowiredRabbitTemplaterabbitTemplate;@Transactionalpublicvoidsend(){rabbitTemplate.setChannelTransacted(true);rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"你好rabbitmq!".getBytes());诠释我=1/0;}}这里注意两点:在发送消息的方法中添加@Transactional注解,标记交易。调用setChannelTransacted方法将其设置为true以启用事务模式。没关系。在上面的例子中,我们在最后有一个1/0,它必须在运行时抛出异常。我们可以尝试运行这个方法,发现消息并没有发送成功。当我们开启事务模式后,RabbitMQ生产者发送消息会多四个步骤:客户端发送请求,设置通道为事务模式。服务器响应,同意将通道设置为事务模式。客户端发送消息。客户端提交事务。服务器给出确认交易提交的响应。以上步骤,除了第三步已经有了,其他步骤无缘无故多出来。所以你可以看到事务模式其实是有点低效的,这不是一个最优方案。我们可以想一想,有哪些项目会用到消息中间件呢?一般来说都是一些高并发的项目,此时并发性能显得尤为重要。因此,RabbitMQ也提供了发送者确认机制(publisherconfirm)来确保消息发送成功。这样一来,性能比事务模式要高很多。让我们来看看。2.2发送方确认机制2.2.1单条消息处理首先我们去掉刚才关于事务的代码,然后在application.properties中配置并启用消息发送方确认机制,如下:spring.rabbitmq.publisher-confirm-type=corelectspring.rabbitmq.publisher-returns=true第一行是配置消息到达交换器时的确认回调,第二行是配置消息到达队列时的回调。第一行属性的配置有三个值:none:表示禁用发布确认方式,默认。correlated:消息发布到交易所成功后触发的回调方法。simple:类似于correlated,支持调用waitForConfirms()和waitForConfirmsOrDie()方法。接下来我们要开启两个监听,具体配置如下:@ConfigurationpublicclassRabbitConfigimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{publicstaticfinalStringJAVABOY_EXCHANGE_NAME="javaboy_exchange_name";publicstaticfinalStringJAVABOY_QUEUE_NAME="javaboy_queue_name";privatestaticfinalLoggerlogger=LoggerFactory.getLogger(RabbitConfig.class);@AutowiredRabbitTemplate兔子模板;@BeanQueuequeue(){returnnewQueue(JAVABOY_QUEUE_NAME);}@BeanDirectExchangedirectExchange(){返回新的DirectExchange(JAVABOY_EXCHANGE_NAME);}@BeanBindingbinding(){returnBindingBuilder.bind(queue()).to(directExchange()).with(JAVABOY_QUEUE_NAME);}@PostConstructpublicvoidinitRabbitTemplate(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(这个);}@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){if(ack){logger.info("{}:消息已成功到达交换",correlationData.getId());}else{logger.error("{}:消息发送失败",correlationData.getId());}}@OverridepublicvoidreturnedMessage(ReturnedMessagereturned){logger.error("{}:消息没有成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());}}关于这个配置类,我说的有以下几点:定义配置类,实现RabbitTemplate。当消息路由到队列失败时,会调用定义initRabbitTemplate方法并添加@PostConstruct注解,在该方法中配置rabbitTemplate的两个Callbacks。就是这样。接下来我们测试消息发送。首先,我们尝试将消息发送到一个不存在的交换器,像这样:)));请注意,第一个参数是字符串,而不是变量。该开关不存在。这时候控制台会报如下错误:接下来我们给一个真正的switch,但是给了一个不存在的queue,像这样:".getBytes(),newCorrelationData(UUID.randomUUID().toString()));注意此时第二个参数是字符串,不是变量。可以看出,虽然消息成功到达了exchange,但是并没有成功路由到queue(因为queue不存在)。这就是消息的发送,我们来看看消息的批量发送。2.2.2消息批处理如果是消息批处理,那么发送成功的回调监听也是一样的,这里不再赘述。这是发布者确认模式。与事务相比,这种模式下的消息吞吐量会大大提高。3、重试失败重试失败有两种情况,一种是根本没有找到MQ导致的重试失败,一种是找到了MQ,但是消息发送失败。我们分别看一下这两次重试。3.1自带重试机制上面提到的事务机制和发送方确认机制都是发送方确认消息发送成功的方式。如果一开始发送端连接不上MQ,那么SpringBoot中有对应的重试机制,但是这个重试机制与MQ本身无关。这是使用Spring中的重试机制完成的。具体配置如下:spring.rabbitmq.template.retry.enabled=truespring.rabbitmq.template.retry.initial-interval=1000msspring.rabbitmq.template.retry.max-attempts=10spring.rabbitmq.template.retry。max-interval=10000msspring.rabbitmq.template.retry.multiplier=2配置含义从上到下分别是:启用重试机制。重试开始间隔。最大重试次数。重试之间的最长时间。间隔时间乘数。(这里配置间隔时间倍数为2,则第一次间隔时间为1秒,第二次重试间隔时间为2秒,第三次为4秒,以此类推。)配置完成后,启动Spring再次启动项目,然后关闭MQ,此时尝试发送消息,会发送失败,导致自动重试。3.2服务重试服务重试主要针对报文没有到达交换机的情况。如果消息没有成功到达交易所,根据我们在第二节的解释,此时会触发消息发送失败回调。在这个回调中,我们可以大做文章!总体思路是这样的:首先创建一个表来记录发送给中间件的消息,如下:每发送一条消息,就在数据库中增加一条记录。这里的字段很容易理解,多了三个:status:表示消息的状态,有三个值,0、1、2分别表示消息正在发送,消息已经发送成功,消息发送失败。tryTime:表示消息的第一次重试时间(消息发送后,在tryTime时还没有发送成功,此时可以开始重试)。count:消息重试次数。其他字段很容易理解,就不一一细说了。消息发送时,我们在表中保存一条消息发送记录,并设置status为0,tryTime为1分钟后。在confirm回调方法中,如果收到消息发送成功的回调,则设置消息状态为1(发送消息时为消息设置msgId,消息发送时通过msgId唯一锁定消息发送成功))。另外,启动一个定时任务。定时任务每隔10s会去数据库中取消息,具体是取状态为0且经过tryTime时间记录的消息。提取这些消息后,首先判断重试次数是否超过3次,如果超过3次,修改消息状态为2,表示消息发送失败,不再重试。对于重试次数不超过3次的记录,重新发送消息并将其计数值加1。大体思路如上。宋兄这里就不给出代码了。松哥vhr中的邮件发送就是这样处理的。完整代码可以参考vhr项目(https://github.com/lenve/vhr)。当然,这种想法有两个缺点:一趟数据库可能会拖慢MQ的Qos,但有时候我们并不需要MQ有很高的Qos,所以这个应用要看具体情况。按照上面的思路,同一条消息可能会重复发送,但这不是问题。我们只需要解决消息消费时的幂等性问题即可。当然大家也要注意消息是否一定要100%发送成功,看具体情况。4.小结好了,以上就是关于消息生产者的一些常见问题以及相应的解决方案。下一篇宋哥将和大家一起探讨如何保证消息消费成功,解决幂等性问题。本文涉及的相关源码可以在这里下载:https://github.com/lenve/java...。
