当前位置: 首页 > 网络应用技术

一篇文章带您彻底掌握RocketMQ消息机制如何处理分布式交易

时间:2023-03-09 10:21:04 网络应用技术

  我们经常将其转移到Yu'ebao,这对于日常生活是正常的事情,但是在考虑该系统挂起该怎么办之后,如果系统挂起,我们将不会增加相应的数量。不一致的情况。

  以上方案可以在各种类型的系统中找到。例如,在e -Commerce系统中,当有用户下订单时,除了在订单表中插入记录外,还必须通过1.确定对应于产品表的产品数量?系统,当用户点击某个广告时,除了在点击中添加记录外,您还必须转到商户帐户表格以查找商家并扣除广告费。您怎么能保证它?交叉服务,我相信每个人都可以遇到类似的场景。

  从本质上讲,您可以抽象:当一个表数据更新时,如何确保必须成功更新另一个表的数据。

  如果它是一个独立的系统(数据库实例也在同一系统上),我们可以轻松地通过本地事务来解决它:

  以支撑底转移Yu'eebao为例(例如10,000元来转移),假设

  从10,000元到Yu'ebao的行动从支付宝到Yu'ebao分为两个步骤:

  如何确保支付架Yu'ebao的付款平衡?

  有人说这很简单,可以通过交易来解决。

  这确实可以解决。如果使用Spring,则可以获得上述交易功能。

  如果系统大小很小,则数据表位于数据库实例上。上面提到的本地交易方法可以很好地运行,但是如果系统大小很大,例如Apleay Account表和Yu'ebao帐户表显然不会在同一数据库Instancethey的同一数据库实例上分布在同一数据库实例上。不同的物理节点。目前,当地事务失去了武术。

  作者的Spring Boot Advanced和Mybatis Advanced Original列已被整理成一本书。请注意公共帐户[代码APE技术列]以回复关键字并获取。

  让我们看一下另外两个主流解决方案:

  两相提交(2PC)通常用于实现分布式交易。它通常分为两个角色:协调员TC和几个事务执行者。此处交易的执行人员是一个特定的数据库。协调员可以在带有事务执行器的机器上。

  让我们根据上图来查看主要过程:

  1)我们的应用程序(客户端)启动了对TC(事务)的起始请求;

  2)TC首先将准备消息写入本地日志,然后将准备消息启动给所有SI.Take Apripay转移到Yu'ebao,例如,从TC到A的准备消息是为了通过扣除来告知Alipay数据库10,000元,以及向B的准备消息,以通过1W通知Yu'ebao数据库的相应帐户。为什么在执行任务之前需要编写本地日志,主要是在失败后恢复。本地日志在现实生活中具有凭证的效果。如果没有本地日志(凭证),则如果存在问题,很容易死亡。

  3)收到准备消息后,SI执行了特定的机器事务,但不会承诺。如果您成功返回是,它将不会以相同的方式成功返回。返回的消息应在返回作为代金券之前写入日志中。

  4)TC收集所有执行器返回的消息。如果所有执行器都返回是,然后将提交消息发送给所有执行器。

  注意:TC或SI写下了日志中发送或接收的消息,主要是为了在故障后恢复。如果已收到该提交,请提交,如果是的,如果是肯定的话,请TC确定下一步。如果没有,则可能在准备阶段中倒塌,因此需要是回滚。

  如今,基于两个阶段实现分布式交易并不难。如果Java使用Java,则可以使用开源软件Atomikos快速实现。

  但是,任何使用上述两个阶段的学生都可以发现表现太差,并且不适合高并发系统。为什么?

  正是由于分布式交易中的严重绩效问题,大多数高并发服务都可以避免,并且通常通过其他渠道解决数据一致性问题。

  如果您仔细观察,许多生活场景都给了我们一个提示。

  例如,在北京,在北京非常有名的Yao Ji订购了肝脏并付钱,他们不会直接给您您订购的油炸肝脏,而是给您一张小票,然后让您握住一张小型Ticketgo到达运输区以排队。为什么他们将支付和捡货的两个动作分开?有很多原因。重要原因之一是增强其接收能力(更高的并发)。

  仍然回到我们的问题,只要有这个小票,您最终就可以得到油炸肝脏。同情转移服务也是如此。当扣除支付账户10,000时,我们只需要生成凭证(消息)。要可靠,我们最终可以增加此代金券(消息)以增加Yu'ebao帐户10,000,也就是说,我们可以依靠此证书(我们可以依靠此证书(消息)要完成最终的一致性。此外,请注意公共号码APE技术列,回复关键字9527以获取春季云阿里巴巴实际战斗视频。

  因此,我们如何可靠地保存凭证(消息):有两种方法:

  1)业务和消息耦合支撑件的方式,完成扣除时,在记录消息数据时,此消息数据和业务数据存储在同一数据库实例中(消息记录称为消息)。

  上述交易可以确保只要扣除支付帐户,该消息肯定会保存。

  成功提交上述交易时,我们将通过真实的时间消息服务通知Yu'ebao。在成功处理Yu'ebao的成功处理后,响应成功。收到答复后,支付宝删除消息数据。

  2)将业务和消息传达给上述保存消息的方法的方法使消息数据和业务数据紧密地组合在一起。从架构中看不够优雅,很容易引起其他问题。为了解决耦合,可以使用以下方式。

  a)在提交扣除事务之前,支付宝将消息发送给真实的时间消息服务请求。真实的 - 时间消息服务仅记录消息数据,并且并未真正发送。

  b)当成功提交支撑件扣除额时,将确认为真实的消息服务。仅在获得确认指令后,实际 - 时间消息服务确实发送了消息;

  c)当将支撑件扣除提交到故障时,将取消交付给真实的时间消息服务。获取取消指令后,将不会发送消息;

  d)对于那些不令人满意的消息或取消,消息状态需要确认系统将转到ABIPAY系统以检查消息的状态并更新。您何时需要此步骤?例如:假设在步骤2中成功提交了支撑式扣除后,系统就会挂起。目前,消息状态尚未更新为“确认发送”,这导致消息发送。

  优点:消息数据是独立存储的,以减少业务系统和消息系统之间的耦合;缺点:发送一条消息需要两个请求;业务处理服务需要将消息状态实现回接口。

  作者的Spring Boot Advanced和Mybatis Advanced Original列已被整理成一本书。请注意公共帐户[代码APE技术列]以回复关键字并获取。

  另一个非常严重的问题是该新闻反复传达。以我们的支撑ay转移到Yu'ebao为例,如果反复提交了两次相同的消息,那么我们的Yu'ebao帐户将增加20,000而不是10,000(上面提到。。

  为什么反复提交相同的消息?在正常情况下,支撑件应删除消息消息,但是如果此时,在重新启动之后挂起的Abipay挂起,则消息MSG仍在那里,它将继续。发送消息msg。

  本质

  为了促进每个人理解,让我们给出另一个银行转移的示例(类似于上一个示例):

  例如,鲍勃将100元转移给史密斯。

  在单独的环境中,执行情况可能是以下:

  当用户在一定程度上生长时,鲍勃和史密斯的帐户和平衡信息不再在同一服务器上,那么上述过程就会如下:

  目前,您会发现它也是转会业务。在集群环境中,生长需要两倍,这显然是不可接受的。如何避免这个问题?

  将大事务拆除为多个小型交易异步执行。这样,可以将交叉机动交易的实施效率优化到与单个机器一致的一致。转移可以分解为两项小型交易:以下两项小型交易:以下两项小型交易:

  在该图中,实施本地事务(BOB帐户扣除)和发送异步消息应确保同时获得成功或失败,即扣除是成功的。发送消息必须成功。如果扣除失败,则无法再次发送消息。问题是:我们是否首先扣除或发送消息?

  首先查看消息首先发送的情况,粗略的示意图如下:

  问题在于,如果消息成功发送,但是扣除额失败,消费者将消耗新闻,然后将资金添加到Smith帐户中。

  如果没有新闻,则扣除额。粗糙的示意图如下:

  现有问题与上述问题相似:如果扣除额成功并且消息失败,则将出现鲍勃扣除,但史密斯帐户不会增加资金。

  也许每个人都会有很多方法来解决这个问题,例如:将消息直接放入鲍勃扣除的交易中。如果发送失败,则会抛出异常,并且交易正在回滚。处理过程还符合“正义发生”的原则。

  RocketMQ支持交易消息。让我们看一下如何实现RocketMQ?

  当RocketMQ在第一阶段发送准备好的消息时,将获得消息的地址。在第二阶段,将执行本地事情。在第三阶段,该消息通过第一阶段获得的地址访问,并修改了消息的状态。

  您可能会再次找到问题。如果确认消息失败,该怎么办?RocketMQ会定期扫描消息群集中的消息。如果发现准备的消息,它将从消息发送端(生产者)中确认。鲍勃的钱减少还是不减少?如果减少,您是回滚还是继续发送确认消息?

  RocketMQ将根据发送端设定的策略来决定是回滚还是发送确认消息。这可以同时保证消息和本地事务的成功或失败。

  然后,让我们看一下RocketMQ源代码,如何处理事务消息。

  客户端发送交易消息的一部分(请检查完整的代码:com.alibaba.Rocketmq.example.example.transactionproduducer com.alibaba.rocketmq.examsaction.transa

  然后检查SendMessageIntransaction方法的源代码,总共3个阶段:发送准备好的消息,执行本地事务并发送确认消息。

  如果EndTraransaction方法失败并且数据未发送到经纪人,则交易消息的状态更新失败,并且代理将针对每个存储交易状态具有一组扫描格式。如果已经提交或回滚,它已被提交或回滚。交易继续执行,最后致电EndTransactionOneway让经纪人更新消息的最终状态。

  回到转让的示例,如果鲍勃帐户的余额减少并且消息已成功发送,史密斯方面开始消耗新闻。目前,消费故障和消费超时将存在两个问题。保存审查直到消费新闻成功为止,并且在整个过程中可能存在重复消息的问题,并且可以根据先前的思考来解决。

  消费者事务新闻基本上可以解决消费者超时的问题,但是如果消费失败了?ALI提供的解决方案是:手动解决方案。您可以考虑根据交易过程,出于某种原因,史密斯失败了,因此您需要为了恢复整个过程。如果消息系统想要实现此回滚过程,则系统的复杂性将大大提高,并且很容易发生错误。据估计,错误的概率将远大于消费失败的可能性。这就是为什么RocketMQ暂时没有解决此问题的原因。在设计消息系统时,我们需要衡量是否值得花费如此大的价格来解决这种问题,以很小的概率。

  我们需要注意以删除3.2.6版中交易消息的实现,因此此版本不支持事务消息。换句话说,将不会检查新闻失败。

  让我们看一个简单的例子:

  消息生产者:

  业务实施类TransactionListenerimpl:

  新闻消费者:

  让我们首先开始消费方,然后开始生产方面:

  在运行之前,让我们看一下Web控制台的新闻:

  主题transaction_producer中没有前所未有的消息,代码逻辑开始执行下面的代码逻辑。

  生产方面:

  您可以看到它通过日志发送了四个消息,并且交易逻辑被称为4次,其中只有一个消息反馈结果为commit_message。

  再次查看控制台:

  可以看出,其主题Transaction_producer只有一条消息要消耗,这与委员会的结论一致。

  消费方面:

  开始消费方,在控制台上只看到一条消息。

  生产者总共生产的四条消息,原因如下:

  这就是为什么我们发出四个消息的原因,但最终,我们只消耗一条消息来确认业务实施类TransActionListenerimpl方法CheclocalTransaction未被调用。

  在上述转移交易的逻辑上,有两个问题:

  1)如果不使用RocketMQ的公司版本,则扣除货币可能会成功,但是由于生产方而导致扣除货币的消息失败了,这导致交易消息在成功提交资金后向RocketMQ确认信息可以提交,这将导致消息不提交给消费者,从而导致鲍勃的钱被扣除,但史密斯的钱没有增加。

  2)已经处理了制造商的整个逻辑,并在数据库中成功提交了货币的扣除。扣除金钱的消息已在RockEtMQ中成功确认,但是当消费者中的消费者消息在消费消息中时,它会发生故障。或由此消息造成的逻辑错误,导致史密斯的钱未正确添加。

  尽管以上两个问题的可能性很低,但只要发生有可能发生的可能性,它肯定会发生在某个时间点,但是失败时的早晨或晚上的问题。,系统日志每天运行以执行对帐操作,以检查当天的总付款和收入是否平衡,每个交易结果是否符合借贷余额等。因此,为了避免上述两个问题,它发生了,我的处理方法是介绍金融系统的业务逻辑来处理它。

  它的业务处理逻辑如下:

  在发送交易交易后,将交易转换消息发送到对帐的主题,报纸消息是非交易的新闻,并成功保存到RocketMQ。随后的交易和解系统将消耗队列中的和解信息,该信息是队列中的和解信息,该信息将在队列中,该信息将其浪费,该信息将在队列中消费。将通过交易生产者和消费者党检查交易。验证逻辑如下:

  交易对帐系统首先与交易的消费者进行检查。如果消费者的消费成功,可以解释的是,整个交易结果符合最终的一致性,因为该消息是生产者成功处理后确认的交易。在交易确认的交易消息后,它是确定制造商已经完成了推论的逻辑。

  只有当交易消息失败或消息方不使用新闻时,交易消息才需要再次与生产者确认。如果生产者成功执行了扣除操作,则需要退缩扣除交易;在成功推论后,这意味着双方都没有消耗。您不必进行任何操作。

  作者:代码猿技术专栏