当前位置: 首页 > 后端技术 > Java

使用rabbitmq时,如何保证消息的可靠传递[2]

时间:2023-04-01 14:11:07 Java

本期重点介绍如何发送消息,首先看依赖:org.springframework.bootspring-boot-starter-amqporg.springframework.bootspring-boot-starter-jdbcorg.mybatis.spring.bootmybatis-spring-boot-starter1.1.1com.alibabadruid1.1.10mysqlmysql-connector-java关键依赖是spring-boot-starter-amqp,发送依赖使用org.springframework.amqp。rabbit发送消息有两个关键步骤:获取模板对象和调用模板对象的发送方法。我们先来看第一步。可以通过直接依赖注入直接获取模板对象:importorg.springframework.amqp.rabbit.core.RabbitTemplate;@AutowiredRabbitTemplaterabbitTemplate;第二步,调用对象的方法:publicvoidconvertAndSend(Stringexchange,StringroutingKey,Objectobject,@NullableCorrelationDatacorrelationData)throwsAmqpException{this.send(exchange,routingKey,this.convertMessageIfNecessary)(corjectData;}其中,objectobjec是发送的消息,Stringexchange是rabbitmq的exchange,如果rabbitmq上不存在这个exchange,则发送失败,比如rabbitqm上只有exchange-1,你传入的参数是exchange-2,发送会失败;StringroutingKey为Routingkey,即交换机必须绑定一个routingkey,才能接收发送到该routingkey的消息,CorrelationDatacorrelationData为发送后触发回调函数的返回参数,一般包括消息正文、发送时间和消息类型。进阶1:模板容器可以使用RabbitTemplate构造方法获取模板对象。这个构造方法有一个重载方法。如果使用不传参的默认方法,则默认方法initDefaultStrategies();内部会用到,但是构造方法处的注解告诉我们ConnectionFactory这个参数还是需要设置的。与setter注入一起使用的便捷构造函数。不要忘记设置连接工厂。这个参数的作用是一个基于接口的ConnectionFactory,用于创建{@linkcom.rabbitmq.client.ConnectionConnections}。在这里,我们注入ConnectionFactory并设置importorg.springframework.amqp.rabbit.connection.ConnectionFactory;@AutowiredprivateConnectionFactoryconnectionFactory;RabbitTemplatenewTemplate=newRabbitTemplate(connectionFactory);使用默认的构造函数方法,默认设置了简单的消息转换器。protectedvoidinitDefaultStrategies(){setMessageConverter(newSimpleMessageConverter());}这个消息转换器的作用是将消息对象序列化,即将java对象转换为传输对象(字节数组);或者反序列化,即将从rabbitmq获取的信息(字节数组)转换为java对象。这里需要我们自己的转换器,就是将转换后的java对象确定为我们自定义的Message对象。实现序列化工厂接口包com.zk.rabbit.common.serializer.impl;importcom.zk.rabbit.api.Message;//自定义消息对象importcom.zk.rabbit.common.serializer.Serializer;importcom.zk.rabbit.common.serializer.SerializerFactory;publicclassSerializerFactoryImplimplementsSerializerFactory{publicstaticfinalSerializerFactoryINSTANCE=newSerializerFactoryImpl();@OverridepublicSerializercreate(){//自定义SerializerImpl序列化器returnSerializerImpl.createParametricType(Message.class);}}自定义SerializerImpl序列化程序以定义特定的序列化步骤。这里我们使用fastJson进行处理导入com.fasterxml.jackson.databind.ObjectMapper;导入com.fasterxml.jackson.databind.SerializationFeature;导入com.zk.rabbit.common.serializer.Serializer;导入org.slf4j.Logger;导入org.slf4j.LoggerFactory;importjava.io.IOException;importjava.lang.reflect.Type;publicclassSerializerImplimplementsSerializer{privatestaticfinalLoggerlog=LoggerFactory.getLogger(SerializerImpl.class);privatestaticfinalObjectMappermapper=newObjectMapper();静态{mapper.disable(SerializationFeature.INDENT_OUTPUT);mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,true);mapper.configure(JsonParser.Feature.ALLOW_COMMENTS,true);mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS,真);mapper.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS,true);mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES,true);mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,true);mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES,true);}私有最终JavaType类型;公开icSerializerImpl(JavaTypetype){this.type=type;}publicSerializerImpl(Typetype){this.type=mapper.getTypeFactory().constructType(type);}publicstaticSerializercreateParametricType(Classclz){returnnewSerializerImpl(mapper.getTypeFactory().constructType(clz));}@Overridepublicbyte[]serializeRaw(Objectcontent){try{returnmapper.writeValueAsBytes(content);}catch(JsonProcessingExceptione){log.error("顺序列表化出错",e);}返回空值;}@OverridepublicStringserialize(Objectcontent){try{returnmapper.writeValueAsString(content);}catch(JsonProcessingExceptione){log.error("顺序列表化出错",e);}返回空值;}@OverridepublicTdeserialize(Stringcontent){try{returnmapper.readValue(content,type);}catch(IOExceptione){log.error("反序列表化出错",e);}返回空值;}@OverridepublicTdeserialize(byte[]content){try{returnmapper.readValue(content,type);}catch(IOExceptione){log.error("反序列化错误",e);}返回空值;}}设置确认回调方法,即继承RabbitTemplate.ConfirmCallback接口,实现确认方法@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanb,Strings){/*注意这里的correlationData是新传入的发送消息时CorrelationData(String.format("%s#%s#%s",message.getMessageId(),System.currentTimeMillis(),message.getMessageType()));*/Stringid=correlationData.getId();Liststrings=splitter.splitToList(id);StringmessageId=strings.get(0);longtimeStamp=Long.parseLong(strings.get(1));intmessageType=Integer.parseInt(strings.get(2));如果(b){如果(MessageType.RELIANT==messageType){//这里是下发成功后修改数据库中数据的状态rabbitProducerService.success(messageId);}log.error("#RabbitTemplateContainer.confirm#发送成功,messageId:{},ACK:{},timeStamp:{}",messageId,b,timeStamp);}else{log.error("#RabbitTemplateContainer.confirm#发送失败,messageId:{},ACK:{},timeStamp:{}",messageId,b,timeStamp);}}定义一个Map对象,exchange是key,template是value,是一个模板容器。每次发送消息时,都会从这个容器中获取模板对象privatestaticfinalMap容器=Maps.newConcurrentMap();publicRabbitTemplategetTemplate(Messagemessage)throwsMessageRuntimeException{Preconditions.checkNotNull(message);Stringtopic=message.getTopic();RabbitTemplatetemplate=container.get(topic);如果(模板!=null){返回模板;}log.info("#RabbitTemplateContainer.getTemplate#topic:{}不存在,创建一个",topic);RabbitTemplatenewTemplate=newRabbitTemplate(connectionFactory);newTemplate.setExchange(主题);newTemplate.setRoutingKey(message.getRoutingKey());RabbitMessageConverter转换器=newRabbitMessageConverter(newGenericMessageConverter(serializer));newTemplate.setMessageConverter(转换器);如果(message.getMessageType()!=MessageType.RAPID){newTemplate.setConfirmCallback(this);}Container.put(主题,新模板);返回新模板;}

最新推荐
猜你喜欢