环境:springboot2.3.9RELEASE+RocketMQ4.8.0依赖org.springframework.bootspring-boot-starter-weborg.apache.rocketmqrocketmq-spring-boot-starter2.2.0配置文件server:port:8080---rocketmq:nameServer:localhost:9876producer:group:demo-mq普通消息发送@ResourceprivateRocketMQTemplaterocketMQTemplate;publicvoidsend(Stringmessage){rocketMQTemplate.convertAndSend("test-topic:tag2",MessageBuilder.withPayload(message).build());}接受@RocketMQMessageListener(topic="test-topic",consumerGroup="consumer01-group",selectorExpression="tag1||tag2")@ComponentpublicclassConsumerListenerimplementsRocketMQListener{@OverridepublicvoidonMessage(Stringmessage){System.out.println("Receivedmessage:"+message);}}顺序消息发送@ResourceprivateRocketMQTemplaterocketMQTemplate;publicvoidsendOrder(Stringtopic,Stringmessage,Stringtags,intid){rocketMQTemplate.asyncSendOrderly(topic+":"+tags,MessageBuilder.withPayload(message).build(),"order-"+id,newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){System.err.println("msg-id:"+sendResult.getMsgId()+":"+message+"\tqueueId:"+sendResult.getMessageQueue().getQueueId());}@OverridepublicvoidonException(Throwablee){e.printStackTrace();}});}这里是根据hashkey向不同队列发送消息@RocketMQMessageListener(topic="order-topic",consumerGroup="consumer02-group",selectorExpression="tag3||tag4",consumeMode=ConsumeMode.ORDERLY)@ComponentpublicclassConsumerOrderListenerimplementsRocketMQListener{@OverridepublicvoidonMessage(Stringmessage){System.out.println(Thread.currentThread().getName()+"收到订单消息:"+message);}}consumeMode=ConsumeMode.ORDERLY,表示消息模式为顺序模式,一个队列,一个线程consumeMode=ConsumeMode.CONCURRENTLY执行结果如下:cluster/broadcastmessagemodesender@ResourceprivateRocketMQTemplaterocketMQTemplate;publicvoidsend(Stringtopic,Stringmessage,Stringtags){rocketMQTemplate.send(topic+":"+tags,MessageBuilder.withPayload(message).build());}集群消息模式消费者@RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",selectorExpression="tag6||tag7",messageModel=MessageModel.CLUSTERING)@ComponentpublicclassConsumerBroadListenerimplementsRocketMQListener{@OverridepublicvoidonMessage(Stringmessage){System.out.println("ConsumerBroadListener1收到一条消息:"+message);}}messageModel=MessageModel.CLUSTERING测试启动两个服务分别端口为8080,80818080服务8081服务集群消息模式,每个服务分别接收部分消息,实现负载均衡广播消息模式消费者@RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",selectorExpression="tag6||tag7",messageModel=MessageModel.BROADCASTING)@ComponentpublicclassConsumerBroadListenerimplementsRocketMQListener{@OverridepublicvoidonMessage(Stringmessage){System.out.println("ConsumerBroadListener1收到消息:"+message);}}messageModel=MessageModel.BROADCASTING测试启动两个服务端口为8080,80818080服务8081服务集群中消息模式,每个服务接受相同的消息事务消息。RocketMQ事务的三种状态TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费这条消息。TransactionStatus.RollbackTransaction:回滚事务,即消息将被删除,不允许被消费。TransactionStatus.Unknown:中间状态,表示需要检查消息队列以确定状态。RocketMQ对事务消息的实现主要分为两个阶段:正常的事务发送和提交,以及事务信息补偿过程。暂时不能消费的消息)2、服务端响应消息写入结果,一半消息发送成功。3.开始执行本地事务。4.根据本地事务的执行情况,执行Commit或Rollback操作事务信息补偿流程。1、如果MQServer长时间没有收到本地事务的执行状态,会向producer发起确认checkback操作请求。2.生产者收到确认回核请求后,会检查本地交易的执行状态。3、根据校验结果进行Commit或Rollback操作补偿该阶段主要用于解决生产者在发送Commit或Rollback操作时超时或失败的情况。发件人@ResourceprivateRocketMQTemplaterocketMQTemplate;publicvoidsendTx(Stringtopic,Longid,Stringtags){rocketMQTemplate.sendMessageInTransaction(topic+":"+tags,MessageBuilder.withPayload(newUsers(id,UUID.randomUUID().toString().replaceAll("-","))).setHeader("BID",UUID.randomUUID().toString().replaceAll("-","")).build(),UUID.randomUUID().toString().replaceAll("-",""))));}生产者对应的监听器@RocketMQTransactionListenerpublicclassProducerTxListenerimplementsRocketMQLocalTransactionListener{@ResourceprivateBusinessServicebs;@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){//在这里执行本地事务操作,比如保存数据。try{//创建一个日志记录表,将这个唯一ID存入数据库,在下面的检查方法中,可以根据这个id查询是否有数据Stringid=(String)msg.getHeaders().get("BID");Usersusers=newJsonMapper().readValue((byte[])msg.getPayload(),Users.class);System.out.println("消息内容:"+users+"\t参与数据:"+arg+"\t本次交易的唯一编号:"+id);bs.save(users,newUsersLog(users.getId(),id));}catch(Exceptione){e.printStackTrace();returnRocketMQLocalTransactionState.ROLLBACK;}returnRocketMQLocalTransactionState。COMMIT;}@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){//这里检查本地事务是否执行成功Stringid=(String)msg.getHeaders().get("BID");System.out.println("ExecutionqueryIDis:"+id+"数据是否存在");UsersLogusersLog=bs.queryUsersLog(id);if(usersLog==null){returnRocketMQLocalTransactionState.ROLLBACK;}returnRocketMQLocalTransactionState.COMMIT;}}Consumer@RocketMQMessageListener(topic="tx-topic",consumerGroup="consumer05-group",selectorExpression="tag10")@ComponentpublicclassConsumerTxListenerimplementsRocketMQListener{@OverridepublicvoidonMessage(Usersusers){System.out.println("TX收到消息:"+users);}}Service@Transactionalpublicbooleansave(Usersusers,UsersLogusersLog){usersRepository.save(users);usersLogRepository.save(usersLog);if(users.getId()==1){thrownewRuntimeException("数据错误");}returntrue;}publicUsersLogqueryUsersLog(Stringbid){returnusersLogRepository.findByBid(bid);}Controller@GetMapping("/tx/{id}")publicObjectsendTx(@PathVariable("id")Longid){ps.sendTx("tx-topic",id,"tag10");return"sendtransactionsuccess";}测试调用接口后,控制台输出:从打印log可以看到,consumer在保存完成后才收到删除数据的消息,然后testID为1,会报错。数据库中没有数据。..是不是很复杂,2个阶段来处理。完全的!!!