当前位置: 首页 > 科技观察

你一直在想的RabbitMQ个人实践就在这里,就在这里

时间:2023-03-13 14:28:33 科技观察

前言MQ(MessageQueue)是一个消息队列,它有很多优点:解耦、异步、削峰等,本文会讲一些概念RabbitMQ的使用。RabbitMq案例Springboot集成了RabbitMQ简单案例Exchange的基本概念:消息开关,指定消息按照规则路由到哪个队列。Queue:消息队列的载体,每条消息都会被放入一个或多个队列中。Binding:绑定,其作用是根据路由规则绑定exchange和queue。RoutingKey:路由关键字,exchange根据这个关键字下发消息。生产者:消息生产者是传递消息的程序。消费者:消息消费者是接收消息的程序。向RabbitMQ发布消息需要两个步骤:producer→exchangeexchange根据exchange的类型和routingkey确定将消息投递到哪个队列。绑定到Exchange(这里会设置路由键)生产者发布消息消费者订阅消息交换器(Exchange)交换器可以绑定队列,绑定的时候可以为队列指定路由(Routingkey)和参数(Arguments)所有的消息都是通过交换机发送到队列而不是直接发送到队列交换机类型:direct根据确定的路由(routingkey)将消息转发到队列(一条消息可以发送到多个队列,只要路由相同即可)fanoutrouting无效,只要交换机绑定的queue能收到消息topic允许routing使用*和#进行模糊匹配*表示一个词表示任意数量(零个或多个)的词例如:如果队列的路由是com.#然后向交换机发送消息,路由填写com.ccc队列,可以接收消息头,忽略路由,通过参数(Arguments)确定转发队列消息过期时间TTL。有两种设置TTL的方法。创建队列时设置整个队列的TTL或每条消息的TTL在发送消息时分别设置,消息生存时间取两者中的最小值。创建队列时,设置的是消息的存活时间,不是队列的存活时间,所以不要搞混了。@BeanpublicQueuequeue(){Mapargs=newHashMap<>();args.put("x-message-ttl",5000);//设置队列中的消息过期5秒returnnewQueue("queueName",true,false,false,args);}SetpublicvoidmakeOrder(Stringuserid,Stringproductid,intnum){StringexchangeName="ttl_exchange";StringroutingKey="ttlmessage";//给Message设置过期时间MessagePostProcessormessagePostProcessor=newMessagePostProcessor(){publicMessagepostProcessMessage(Messagemessage){//设置消息5秒后过期message.getMessageProperties().setExpiration("5000");回复信息;}}rabbitNamemplate.convertAndSend(,routingKey,"message",messagePostProcessor);}死信队列死信队列也是一个普通的队列,但是当死信队列绑定的队列满足相应条件时,满足条件将转移到死信队列。进入死信队列的条件:消息被拒绝,消息过期(超时)队列达到最大长度,死信队列的配置:按照正常步骤定义一个队列(switch,queue,binding)和添加x到需要绑定死信队列的队列-dead-letter-exchange(死信队列开关)和x-dead-letter-routing-key(死信队列路由)参数@BeanpublicQueuequeue(){Mapargs=newHashMap<>();args.put("x-dead-letter-exchange","死信队列交换名");args.put("x-dead-letter-routing-key","死信队列路由");returnnewQueue("queueName",true,false,false,args);}如何保证MQ消息的正确传递和消费的可靠性生产和推送步骤:发送消息前,数据库保存MQ消息发送日志MQ消息发送后、使用回调更新Log状态的实现:上面我们说了,向RabbitMQ发布消息需要两步:producer→exchangeexchange根据exchange类型和routingkey确定将消息投递到哪个队列。因此,发布消息的确认也分为两部分。下面是确认步骤:修改MQ响应机制(yml)spring:rabbitmq:username:rmqpassword:123456virtual-host:/host:localhostport:5672#发送消息确认,producer->exchangepublisher-confirm-type:corelated#发送消息确认,exchange->queuepublisher-returns:true添加mq回调方法/***PostConstruct注解很多人认为是Spring提供的。其实就是Java自己的注解。*Java中对该注解的说明:@PostConstruct该注解用于修饰一个非静态的void()方法。*@PostConstruct修饰的方法会在服务端加载Servlet时运行,服务端只会执行一次。*PostConstruct在构造函数之后和init()方法之前执行。*Constructor(构造方法)->@Autowired(依赖注入)->@PostConstruct(注解方法)*/@PostConstructprivatevoidregCallBack(){//Producer->交换成功或失败都会触发这个回调rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){//发送消息时传入这个idStringid=correlationData.getId();//如果ack为true表示mq成功接收到消息if(!ack){//响应失败,修改日志状态System.out.println("exchangefailedtorespond,dofailurehandling!");}else{//响应成功,修改日志状态System.out.println("exchangesuccessfullyprocessed");}}});//只有当exchange->queue失败时才会触发此回调.println("exchange->queuefailedtosend");}});}修改MQ发送消息的方法,增加logid的传输StringcorrelationId="Thisisthelogid";rabbitTemplate.convertAndSend(exchange,routeKey,消息,新的MessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Messagemessage)throwsAmqpException{//消费者需要correlationId来做这个处理message.getMessageProperties().setCorrelationId(correlationId);回复信息;}},新的CorrelationData(correlationId));//如果消费者不需要获取correlationId,使用下面的rabbitTemplate.convertAndSend(exchange,routeKey,msg,newCorrelationData(correlationId));可靠性消耗步骤:开启手动响应监听器,添加手动响应逻辑实现:开启手动响应spring:rabbitmq:username:rmqpassword:123456virtual-host:/host:localhostport:5672listener:simple:acknowledge-mode:manual#将自动响应ack模式改为手动响应acknowledge-mode共有三种:nome:没有ack,rabbitmq默认消费者正确处理所有请求munual:手动确认auto:自动确认消息(默认类型)如果消费者抛出异常,则消息将返回到队列中。监听器添加手动响应逻辑@RabbitListener(queues={"queuename"})publicvoidmessageConsumer(StringorderMsg,Channelchannel,@HeadersMapheaders)throwsException{//需要producer处理,consumer获取correlationIdStringcorrelationId=messages.getMessageProperties().getCorrelationId();System.out.println("消息是:"+orderMsg);longtag=Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString());try{//消费成功,确认channel.basicAck(tag,false);}catch(IOExceptione){//消费失败,重新发送//requeue表示是否重新发送,如果为false,直接丢弃消息,如果有死信,则进入死信队列channel.basicNack(tag,false,真的);}}总结本文介绍了RabbitMQ的一些概念和简单使用。还有很多没有解释清楚,比如publisher-confirm-type和acknowledge-mode几种类型的区别等等。主要原因是在官方文档中找不到相关的详细说明,文档查看能力还有待提高。..