1.活动中心场景介绍在电商系统上线初期,经常会开展一些“新”的活动,比如活动部为新用户注册提供积分、优惠券等。基于分布式和微服务的设计理念,通常的架构设计(子系统交互)如下图所示:核心系统介绍如下:账户中心提供用户登录、用户注册等服务。当有新用户注册时,MQ服务器向消息中的USER_REGISTER主题发送消息,主流程结束,与发送积分和优惠券的流程解耦。Coupon(优惠券系统)提供发放优惠券、使用优惠券等与优惠券相关的基本服务。积分中心提供与积分相关的服务,如积分赠送、积分消费、积分查询等基础服务。积分发送服务(消费者)订阅MQ,根据规则决定是否给予积分,必要时调用积分相关的基础接口完成积分发放。发送优惠券(消费者)订阅MQ,根据规则决定是否发放优惠券,必要时调用优惠券系统相关的基础接口完成优惠券的发放。上面的架构设计很优雅,但也不是无懈可击。读者肯定会想到,如果新用户注册成功,但是消息发送给MQ失败,或者消息发送给MQ成功,但是MQ发送后,系统出现异常,用户注册失败。怎么做?上面的问题其实就是一个典型的分布式事务问题:即如何保证用户注册(数据库操作)和MQ消息发送这两个分布式操作的一致性。RocketMQ事务消息登场。2、事务消息实现的原理可以用一句话概括:RocketMQ事务消息要解决的问题是消息发送和业务的一致性。它的解决思路是:两阶段提交和事务状态审查。具体实现过程如下图所示:其核心设计理念:应用程序启动一个数据库事务,进行数据库操作,在事务中发送PREPARE消息。PREPARE消息发送成功后,通知应用记录本地事务状态,然后提交本地事务。RocketMQ收到PREPARE类型的消息时,首先备份消息的原主题和原消息消费队列,然后将消息存储到主题为RMQ_SYS_TRANS_HALF_TOPIC的消息队列中,所以PREPARE的消息不会被消费客户端。Broker消息服务器启动一个定时任务处理RMQ_SYS_TRANS_HALF_TOPIC中的消息,每隔指定的时间会向消息发送方发送一个事务状态查询请求,询问消息发送方客户端本地事务是否成功,然后决定是否commit或者根据checkback状态回滚,即在PREPARE状态下commit或rollback操作。如果发送方明确知道交易成功,就可以返回COMMIT,服务器就会提交消息。具体操作是恢复原消息的主题和队列,重新发送给Broker,消费者感知后进行消费。如果发送方无法清楚地知道交易状态,则返回UNOWN。这时,服务器会等待一定的时间,然后再询问发送方。默认为15次。如果很明显交易失败,发送方可以返回ROLLBACK。在实践中,消息发送方在获取不到事务状态时不应随意返回ROLLBACK,而应返回UNOWN,以便服务器周期性重试查询。说明如下:服务端向Broker发送PREPARE消息后,发起查询事务时,本地事务可以不提交。为了避免无效的事务审核机制,RocketMQ通常在收到PREPARE消息后至少6s后才发起第一次事务审核,可以通过transactionTimeOut进行配置。因此,客户端在实现事务回溯时不能证明事务状态时不应该返回ROLLBACK,而是返回UNOWN。3、商讯实战是空谈,不练假动作。下面以新用户注册送券的场景,详细介绍如何使用商讯。项目模块职责如下:交易消息的核心代码组装在transaction-service中,其核心类图如下:核心点如下:UserServiceImplDubbo接口业务实现类,类似MVC控制层,这里做了一些参数校验,但是并没有执行具体的业务逻辑,只是向MQ发送了一个事务消息。UserRegTransactionListener事务监听器在executeLocalTransaction方法中执行业务逻辑,数据库本地事务添加到该方法中。温馨提示:之所以在UserServicveImpl中不执行本地事务,是因为executeLocalTransaction中抛出的异常会被RocketMQ框架捕获,而UserServiceImpl无法感知该异常,即无法实现其事务的一致性。接下来展示它的核心代码,所有源码已经上传到github仓库。仓库地址:https://github.com/dingwpmz/rocketmq-learning3.1UserServiceImpl核心实现UserServiceImpl的核心要点如下:首先,对参数和业务逻辑进行验证。如果不满足业务条件,一些向MQ发送无效消息不会导致业务异常,但是发送事务消息会消耗性能。建议为消息设置一个Key。Key的值可以是业务处理流水号(可以唯一代表业务操作)或者核心业务字段(比如订单号)业务入口类可以通过发送事务消息的状态来判断业务是否失败.3.2UserRegTransactionListener交易监听器的核心实现需要实现执行本地交易和交易审核两个接口。3.2.1实现executeLocalTransaction,首先需要实现executeLocalTransaction方法来执行本地事务。代码如下图所示:几个关键点解释如下:在该方法中添加一个数据库事务标签。为了执行业务逻辑,示例Demo只是将用户数据存储在数据库中。如果业务执行失败,可以明确告知需要回滚,上层调用者也可以根据ROLLBACK_MESSAGE进行相应的处理。如果业务成功,不建议直接返回COMMIT,而是返回UNKNOW,因为这个方法虽然在方法的最后一行,但是可能会出现断电等异常情况,数据库不成功。3.2.2checkLocalTransaction的实现其次,需要实现事务状态checkback,用于RocketMQ服务端感知事务是否成功。实现原理如下图所示:实现要点如下:如果明确本地事务成功,返回COMMIT_MESSAGEas不应该明确本地事务成功,不能返回ROLLBACK_MESSAGE,但返回UNKNOW,等待服务器查回下一个事务(不会立即触发)。服务器默认检查15次。如果UNKNOW被获取15次,它将回滚消息。3.3代码获取以上只是对交易消息的核心代码进行解读,着重说明每一步的重点。笔者基于SpringBoot,结合场景尝试学习RocketMQ的使用技巧,代码上传至github仓库。https://github.com/dingwpmz/rocketmq-learning《RocketMQ技术内幕》作者,RocketMQ社区优秀布道者,专注于将主流JAVA中间件分享成系统,构建完整的互联网架构体系,目前覆盖Java并发而微服务未来会继续专注于监控、在线诊断等领域。本文转载自微信公众号“中间件兴趣圈”,可通过以下二维码关注。转载本文请联系中间件兴趣圈公众号。