SpringBoot集成RabbitMQgithub地址:https://github.com/erlieStar/rabbitmq-examplesSpring有三种基于XML的配置方式基于注解的JavaConfig当然现在很少用XML来配置了,简单介绍一下使用JavaConfig和注解配置方式RabbitMQ集成了SpringBoot,我们只需要添加对应的starterapplication.yaml中注解配置如下spring:rabbitmq:host:myhostport:5672username:guestpassword:guestvirtual-host:/log:exchange:log.exchangeinfo:queue:info.log.queuebinding-key:info.log.keyerror:queue:error.log.queuebinding-key:error.log.keyall:queue:all.log.queuebinding-key:'*.log.key'消费代码如下@Slf4j@ComponentpublicclassLogReceiverListener{/***接收信息级别*/@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${log.info.queue}",durable="true"),exchange=@Exchange(value="${log.exchange}",type=ExchangeTypes.TOPIC),key="${log.info.binding-key}"))publicvoidinfoLog(Messagemessage){Stringmsg=newString(message.getBody());log.info("infoLogQueue收到的消息是:{}",msg);}/***接收所有日志*/@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${log.all.queue}",durable="true"),exchange=@Exchange(value="${log.exchange}",类型=ExchangeTypes.TOPIC),key="${log.all.binding-key}"))publicvoidallLog(Messagemessage){Stringmsg=newString(message.getBody());log.info("收到的消息byallLogQueueis:{}",msg);}}生产者如下@RunWith(SpringRunner.class)@SpringBootTestpublicclassMsgProducerTest{@AutowiredprivateAmqpTemplateamqpTemplate;@Value("${log.exchange}")privateStringexchange;@Value("${log.info.binding-key}")privateStringroutingKey;@SneakyThrows@TestpublicvoidsendMsg(){for(inti=0;i<5;i++){Stringmessage="thisisinfomessage"+i;amqpTemplate.convertAndSend(exchange,routingKey,message);}System.in.read();}}SpringBoot的消息ack方法和nativeapi的消息ack有点不同。土生土长的有两种方法可以确认pi消息。自动确认(autoAck=true)手动确认(autoAck=false)消费者消费消息时,可以指定autoAck参数replyconfirmationmessagebeforeremovedthemessagefrommemory(ordisk)autoAck=true:RabbitMQ会自动将发送的消息设置为确认,然后从内存(或磁盘)中删除,不管消费者是否实际消费这些消息,手册确认方法如下。有2个参数basicAck(longdeliveryTag,booleanmultiple)deliveryTag:用于标识通道中投递的消息。RabbitMQ在推送消息给Consumer的时候,会附带一个deliveryTag,这样Consumer在消息确认的时候可以告诉RabbitMQ哪条消息已经确认了。RabbitMQ保证在每个通道中,每条消息的deliveryTag将从1开始递增。multiple=true:消息id<=deliveryTag的消息将被确认。myltiple=false:消息id=deliveryTag的消息将被确认。消息尚未得到证实。会发生什么?如果队列中的消息发送给消费者,消费者没有确认消息,则消息会一直留在队列中,直到被确认,不会被删除。如果发送给消费者A的消息还没有被确认,只有当消费者A和rabbitmq的连接中断后,rabbitmq才会考虑将消费者A未确认的消息重新投递给另一个消费者。SpringBoot中messageack的方法一共有三种,定义在AcknowledgeMode枚举类中说明NONE没有ack,相当于原生API中的autoAck=trueMANUAL。用户需要手动发送ack或nack。AUTO方法正常结束,springboot框架返回ack,出现异常springboot框架返回nackspringboot默认的消息ack方法是AUTO。实际场景中,我们一般都是手动ack。application.yaml的配置改为如下spring:rabbitmq:host:myhostport:5672username:guestpassword:guestvirtual-host:/listener:simple:acknowledge-mode:manual#manualack,默认auto和对应的consumercode改为@Slf4j@ComponentpublicclassLogListenerManual{/***Receiveinfolevellogs*/@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${log.info.queue}",durable="true"),exchange=@Exchange(value="${log.exchange}",type=ExchangeTypes.TOPIC),key="${log.info.binding-key}"))publicvoidinfoLog(Messagemessage,Channelchannel)throwsException{Stringmsg=newString(message.getBody());log.info("infoLogQueue收到的消息是:{}",msg);try{//这里写各种业务逻辑channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}}我们上面使用的注解有以下功能。注解作为RabbitListener消费消息,可以在类上定义。方法方面,定义在类上时,需要配合RabbitHandler使用QueueBinding定义绑定关系Queue定义队列Exchange定义开关RabbitHandler在类上定义RabbitListener时,需要使用RabbitHandler指定处理方式基于JavaConfig既然使用注解这么方便,为什么还需要JavaConfig?JavaConfig方便自定义各种属性比如同时配置多个虚拟主机等具体代码看GitHubRabbitMQ是如何保证消息可靠传递的一条消息往往会经历以下几个阶段在这里插入图片描述这样才能保证可靠传递的消息,你只需要在生产阶段保证这3个阶段的可靠传递。这个阶段的可靠投放主要靠ConfirmListener(发布者确认)和ReturnListener(失败通知)。前面说到一条消息在RabbitMQ中的流程是producer->rabbitmqbrokercluster->exchange->queue->consumerConfirmListener可以拿到消息是否从producer发送到brokerReturnListener可以拿到没有路由的消息从交换到队列。我使用SpringBootStarter的API来演示效果application.yamlspring:rabbitmq:host:myhostport:5672username:guestpassword:guestvirtual-host:/listener:simple:acknowledge-mode:manual#Manualack,默认是autolog:exchange:log.exchangeinfo:queue:info.log.queuebinding-key:info.log.key发布者确认回调@ComponentpublicclassConfirmCallbackimplementsRabbitTemplate.ConfirmCallback{@AutowiredprivateMessageSendermessageSender;@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){StringmsgId=correlationData.getId();Stringmsg=messageSender.dequeueUnAckMsg(msgId).if(ack){“系统消息已成功发送至{%s}mq",msg));}else{//可以添加一些重试逻辑System.out.println(String.format("Message{%s}failedtosendmq",msg));}}}失败通知回调@ComponentpublicclassReturnCallbackimplementsRabbitTemplate.ReturnCallback{@OverridepublicvoidreturnedMessage(Messagemessage,intreplyCode,StringreplyText,Stringexchange,StringroutingKey){Stringmsg=newString(message.getBody());System.out.println(String.format("消息{%s}无法正确路由,routingKey为{%s}",msg,routingKey));}}@ConfigurationpublicclassRabbitMqConfig{@BeanpublicConnectionFactoryconnectionFactory(@Value("${spring.rabbitmq.host}")Stringhost,@Value("${spring.rabbitmq.端口}")端口,@价值(”${spring.rabbitmq.username}")Stringusername,@Value("${spring.rabbitmq.password}")Stringpassword,@Value("${spring.rabbitmq.virtual-host}")Stringvhost){CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(主机);connectionFactory.setPort(端口);connectionFactory.setUsername(用户名);connectionFactory.setPassword(密码);connectionFactory.setVirtualHost(虚拟主机);connectionFactory.setPublisherConfirms(真);connectionFactory.setPublisherReturns(真);returnconnectionFactory;}@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory,ReturnCallbackreturnCallback,ConfirmCallbackconfirmCallback){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);rabbitTemplate.setReturnCallback(returnCallback);rabbitTemplate.setConfirmCallback(confirmCallback);//要想使returnCallback生效,必须设置为truerabbitTemplate.setMandatory(true);returnrabbitTemplate;}}这里我对RabbitTemplate进行了封装,主要是在发送的时候加上了messageid,保存了messageid留言对应关系,因为RabbitTemplate.ConfirmCallback只能获取消息id,不能获取消息内容,所以需要我们自己保存这个映射关系在一些对可靠性要求比较高的系统中,可以把这个映射关系存在数据库中,发送删除映射关系成功,发送@ComponentpublicclassMessageSender{@AutowiredprivateRabbitTemplaterabbitTemplate;publicfinalMap
