当前位置: 首页 > 科技观察

三张图带你彻底了解RocketMQ事务消息

时间:2023-03-12 02:28:30 科技观察

大家好,我是君哥,事务消息是一种分布式事务的解决方案。RocketMQ有成熟的事务消息模型。今天就来说说RocketMQ的事务性消息实现机制。如果有电商场景,用户下单后,账户服务从用户账户中扣除金额,然后通知库存服务为用户发货。这两个服务需要在一个分布式事务内完成。此时账户服务充当生产者,库存服务充当消费者。看下面的消息流:账户服务充当生产者,向代理发送半条消息。半消息发送成功后,执行本地事务。如果执行成功,则向Broker发送提交请求,否则发送回滚请求。如果Broker收到回滚请求,则删除保存的一半消息。如果Broker收到commit请求,会保存扣库存消息(这里的处理是将消息从half队列投递到real队列),然后删除保存的half消息。如果Broker没有收到请求,会向Producer发送请求查询本地事务状态,然后根据Producer返回的本地状态做commit/rollback相关处理。1、半消息在上面的电商案例中,RocketMQ解决分布式事务的第一步就是从账户服务发送半消息。首先看官网一个发送事务消息的示例:publicstaticvoidmain(String[]args)throwsMQClientException,InterruptedException{TransactionListenertransactionListener=newTransactionListenerImpl();TransactionMQProducer生产者=newTransactionMQProducer("please_rename_unique_group_name");ExecutorServiceexecutorService=newThreadPoolExecutor(2,5,100,TimeUnit.SECONDS,newArrayBlockingQueue(2000),newThreadFactory(){@OverridepublicThreadnewThread(Runnabler){Threadthread=newThread(r);线程.setName("client-transaction-msg-check-thread");返回线程;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);生产者.start();String[]tags=newString[]{"TagA","TagB","TagC","TagD","TagE"};for(inti=0;i<10;i++){try{Messagemsg=newMessage("TopicTest1234",tags[i%tags.length],"KEY"+i,("HelloRocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResultsendResult=producer.sendMessageInTransaction(msg,null);System.out.printf("%s%n",sendResult);线程.睡眠(10);}catch(MQClientException|UnsupportedEncodingExceptione){e.打印堆栈跟踪();}}for(inti=0;i<100000;i++){Thread.sleep(1000);}producer.shutdown();}上面代码中Producer有一个TransactionListener属性,开发者通过自己定义这个接口实现这个接口有两种方式:提交本地事务executeLocalTransaction。检查本地事务状态checkLocalTransaction。下面的代码是发送事务消息的方法:transactionsListener=getCheckListenernAccessListener=getCheckListener()逻辑;putProperty(msg,MessageConst.PROPERTY_TRANSACTION_PREPARED,"true");MessageAccessor.putProperty(msg,MessageConst.PROPERTY_PRODUCER_GROUP,this.defaultMQProducer.getProducerGroup());尝试{sendResult=this.send(msg);}catch(Exceptione){thrownewMQClientException("发送消息异常",e);}LocalTransactionStatelocalTransactionState=LocalTransactionState.UNKNOW;//省略发送结果处理try{this.endTransaction(msg,sendResult,localTransactionState,localException);}catch(Exceptione){}TransactionSendResulttransactionSendResult=newTransactionSendResult();//省略封装属性returntransactionSendResult;}从这段代码可以看出,在在发送消息之前,为消息封装了一个属性PROPERTY_TRANSACTION_PREPARED。通过这个属性,可以找到Broker端处理Broker。保存半条消息时,将消息主题更改为RMQ_SYS_TRANS_HALF_TOPIC,然后将消息投递到queueId等于0的队列中,投递成功后返回PutMessageStatus.PUT_OK给Producer。代码如下:publicCompletableFutureasyncPutHalfMessage(MessageExtBrokerInnermessageInner){returnstore.asyncPutMessage(parseHalfMessageInner(messageInner));}privateMessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInnermsgInner){MessageAccessor.putProperty(msgInner,Message_ConstL.PROPERTY);MessageAccessor.putProperty(msgInner,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),MessageSysFlag.TRANSACTION_NOT_TYPE));msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));returnmsgInner;}2.执行本地事务上一节提到,Producer在发送事务消息时,会发送一个transactionListener,发送一半消息成功后,会通过transactionListener的executeLocalTransactionBranch提交本地事务,代码为如下://ClassDefaultMQProducerImplppublicTransactionSendResultsendMessageInTransaction(finalMessagemsg,finalLocalTransactionExecuterlocalTransactionExecuter,finalObjectarg)throwsMQClientException{//省略部分代码SendResultsendResult=null;//省略部分代码try{sendResult=this.send(msg);}catch(Exceptione){thrownewMQClientException("sendmessageException",e);}LocalTransactionStatelocalTransactionState=LocalTransactionState.UNKNOW;抛出的localException=null;switch(sendResult.getSendStatus()){caseSEND_OK:{try{//省略一些代码if(null!=localTransactionExecuter){//这个分支已经过时了localTransactionState=localTransactionExecuter.executeLocalTransactionBranch(msg,arg);}elseif(transactionListener!=null){//执行本地事务localTransactionState=transactionListener.executeLocalTransaction(msg,arg);}//省略一些代码}catch(Throwablee){}}break;//省略其他情况}try{this.endTransaction(msg,sendResult,localTransactionState,本地异常);}catch(Exceptione){log.warn("localtransactionexecute"+localTransactionState+",butendbrokertransactionfailed",e);}//省略部分代码returntransactionSendResult;}从上面的代码可以看出,本地事务执行完成后,会调用一个endTransaction方法,就是向Broker发送commit/rollback,或者sendUNKNOW,封装到requestHeader的commitOrRollback属性中。此请求的请求代码为END_TRANSACTION。3.Commit/rollback处理根据请求码END_TRANSACTION可以找到Broker端对事务消息的处理。代码如下://EndTransactionProcessor类publicRemotingCommandprocessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{//省略部分分发OperationResultresult=newOperationResult();if(MessageSysFlag.TRANSACTION_COMMIT_TYPE==requestHeader.getCommitOrRollback()){//查询出一半消息result=this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if(result.getResponseCode()==ResponseCode.SUCCESS){//检查groupId和消息偏移量是否合法RemotingCommandres=checkPrepareMessage(result.getPrepareMessage(),requestHeader);如果(res.getCode()==ResponseCode.SUCCESS){MessageExtBrokerInnermsgInner=endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());//删除PROPERTY_TRANSACTION_PREPARED属性MessageAccessor.clearProperty(msgInner,MessageConst.PROPERTY_TRANSACTION_PREPARED);RemotingCommandsendResult=sendFinalMessage(msgInner);if(sendResult.getCode()==ResponseCode.SUCCESS){//删除一半消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}返回发送结果;}返回资源;}}elseif(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE==requestHeader.getCommitOrRollback()){result=this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if(result.getResponseCode()==ResponseCode.SUCCESS){RemotingCommandres=checkPrepareMessage(result.getPrepareMessage(),requestHeader);if(res.getCode()==ResponseCode.SUCCESS){this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}返回资源;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());returnresponse;}这段代码逻辑很清晰,先找出halfmessage,然后检查找到的message(groupId和messageoffset是否合法),如果是commit,去掉事务messagepreparationstage属性,重新post消息到原队列,然后删除一半如果消息回滚,则直接删除一半消息。注意:对于UNKNOW的类型,这里直接返回null,上面的代码就不贴了。4、检查事务状态Broker初始化时,会初始化一个TransactionalMessageServiceImpl线程。该线程会定时检查过期消息,通过向Producer发送检查消息获取交易状态。代码如下://TransactionalMessageCheckServiceprotectedvoidonWaitEnd(){//超时时间,默认6slongtimeout=brokerController.getBrokerConfig().getTransactionTimeOut();//最大检查次数,默认15intcheckMax=brokerController.getBrokerConfig().getTransactionCheckMax();longbegin=System.currentTimeMillis();log.info("开始查看准备消息,开始时间:{}",begin);this.brokerController.getTransactionalMessageService().check(timeout,checkMax,this.brokerController.getTransactionalMessageCheckListener());log.info("Endtocheckpreparemessage,consumedtime:{}",System.currentTimeMillis()-begin);}这里需要注意两个参数:事务消息超时时间,超时后会发送给Producer发送检查消息检查本地事务状态,默认6s。最大检查次数,Broker向Producer发送检查消息后,检查次数加1。超过最大检查次数后,半个消息将被丢弃。默认最大检查次数为15;注意:这里的丢弃是将消息写入一个新队列,Topic为TRANS_CHECK_MAX_TIME_TOPIC,queueId为0。文件保存时间,默认72小时。检查事务消息的过程如下:Producer收到检查消息后,最后调用TransactionListener中定义的checkLocalTransaction方法查询本地事务的执行状态,然后发送给Broker。需要注意的是,当检查消息发送给Broker时,会在请求头中为fromTransactionCheck属性赋值为true,以标记为检查消息。Broker收到checkresponse消息后,处理逻辑与第3节相同,唯一不同的是check消息和非check消息打印的日志不同。5、小结从上面代码的分析可以看出,RocketMQ的事务消息实现机制非常简单。在使用事务消息时,自己定义TransactionListener,实现执行本地事务executeLocalTransaction和检查本地事务状态checkLocalTransaction这两个方法,然后使用TransactionMQProducer发送。最后附上Producer端的UML类图: