在之前的博文中,我们介绍了ApacheKafka?的once语义。这篇文章介绍了各种消息传递语义,展示了幂等生产者、事务和Kafka流的一次性处理语义。现在,我们将继续上一节并深入探讨ApacheKafka中的事务。本文档的目的是让读者熟悉在ApacheKafka中有效使用事务API所需的主要概念。我们将讨论设计事务API的主要用例、Kafka的事务语义、Java客户端事务API的详细信息、实现的有趣方面以及使用API时的重要注意事项。这篇博文不是关于使用事务的细节的教程,我们也不会深入研究设计细节。相反,在适当的情况下,我们将为希望深入研究的读者链接到JavaDocs或设计文档。我们希望读者熟悉基本的Kafka概念,例如主题、分区、日志偏移量,以及代理和客户端在基于Kafka的应用程序中的角色。熟悉Java中的Kafka客户端也会有所帮助。为什么交易?我们在Kafka中设计事务主要是为了表现出“读取-处理-写入”模式的应用程序,其中读取和写入来自异步数据流,例如Kafka主题。此类应用程序通常称为流处理应用程序。第一代流处理应用程序可以容忍不准确的处理。例如,使用网页印象流并为每个网页生成聚合视图计数的应用程序可以容忍计数中的一些错误。然而,随着这些应用程序越来越受欢迎,对具有更强语义的流处理应用程序的需求也在增长。例如,一些金融机构使用流处理应用程序来处理用户帐户的借方和贷方。在这些情况下,处理中的错误是不能容忍的:我们需要一次准确地处理所有消息,无一例外。更正式地说,如果流处理应用程序使用消息a并生成消息B,使得B=F(a),那么仅一次处理意味着当且仅当B成功生成时才使用a,反之亦然。使用配置了至少一次交付语义的普通Kafka生产者和消费者,流处理应用程序可能会通过以下方式丢失一次处理语义:Producer.send()可能会由于内部重试而导致重复写入消息B。这是由幂等生产者解决的,不是本文其余部分的重点。我们可能会重新处理输入消息A,导致将重复的B消息写入输出,这违反了恰好一次处理语义。如果流处理应用程序在写入B之后但在将A标记为已使用之前崩溃,则可能会发生重新处理。所以当它恢复时,它再次消耗A并再次写入B,导致重复。最后,在分布式环境中,应用程序崩溃,或者更糟!-暂时失去与系统其余部分的连接。通常,新实例会自动启动以替换那些被认为丢失的实例。通过这个过程,我们可能有多个实例处理同一个输入主题并写入同一个输出主题,导致重复输出并违反一次处理语义。我们称之为“僵尸实例”问题。我们在Kafka中设计了事务API来解决第二个和第三个问题。事务通过使这些周期原子化并通过促进严格隔离,在读取和写入周期中启用恰好一次处理。事务语义原子多分区写入事务允许对多个Kafka主题和分区进行原子写入。事务中包含的所有消息都将被成功写入,或者不被写入。例如,处理过程中的错误可能会导致事务中止,在这种情况下,来自事务的任何消息都不会被消费者读取。现在让我们看看它是如何实现原子读写循环的。首先,让我们考虑一下原子读写周期的含义。简而言之,这意味着如果应用程序使用偏移量为X的主题分区tp0的消息,并将消息B写入主题分区tp1以对该消息进行一些处理,B=F(a),则read-process-仅当消息被认为已成功消费并一起发布时,写周期才是A和B原子的,或者根本没有。现在,只有当消息A的偏移量X被标记为已使用时,消息A才被认为是从主题分区tp0中使用的。将偏移量标记为已使用称为已提交偏移量。在Kafka中,我们通过写入内部Kafka主题偏移量主题来记录偏移量提交。只有当消息的偏移量提交到偏移量主题时,消息才被视为已消耗。因此,从偏移量提交只是对Kafka主题的另一次写入,并且由于仅当消息的偏移量消耗一次提交时才考虑消息,因此原子写入还支持跨多个主题和分区的原子读-进程-写循环:committedOffsettingX的补偿主题写入到tp1的消息B将成为单个事务的一部分,因此是原子的。Zombiefencing我们通过要求为每个交易生产者分配一个名为transaction.id的唯一标识符来解决僵尸实例的问题。这用于在进程重新启动时标识相同的生产者实例。API要求事务生产者的第一个动作应该是显式注册其事务。使用Kafka集群的ID。当它这样做时,Kafka经纪人检查给定交易的开放交易。识别并完成它们。它还会增加与transaction.id关联的纪元。epoch是存储在每个transaction.id中的内部元数据。一旦纪元发生碰撞,任何具有相同交易的生产者。ID和旧时间被视为僵尸并被隔离。来自这些生产者的未来交易写入将被拒绝。阅读交易消息现在,让我们将注意力转向阅读作为交易的一部分编写的消息时所提供的保证。Kafka消费者只会在事务提交时向应用程序提交事务消息。换句话说,消费者不会将事务性消息作为开放事务的一部分进行传递,也不会将消息作为已中止事务的一部分进行传递。值得注意的是,上述保证并没有达到原子读。特别是,当使用Kafka消费者从主题中消费消息时,应用程序将不知道这些消息是否作为事务的一部分写入,因此它们将不知道事务何时开始或结束。此外,不能保证给定的消费者订阅作为交易一部分的所有分区,并且它还没有发现这种方法,这使得很难保证作为交易一部分的所有信息最终对消费者可用.简而言之:Kafka保证消费者最终只会传递非事务性消息或提交的事务性消息。它将保留来自打开事务的消息并过滤掉来自中止事务的消息。Java中的事务API事务功能主要是服务器端和协议级别的功能,任何支持它的客户端库都可以使用它。一个用Java编写的读写应用程序,使用Kafka的事务API,应该如下所示:第1-5行通过指定事务设置生产者。配置id并将其注册到initTransactionsAPI。在inittransactions()返回后,由具有相同事务的生产者的另一个实例启动的任何事务。id将被关闭和隔离。第7-10行指定KafkaConsumer应仅读取非事务性消息,或从其输入主题提交事务性消息。流处理应用程序通常在多个读取和写入阶段处理其数据,每个阶段都使用前一阶段的输出作为其输入。通过指定read_committed模式,我们可以在所有阶段只执行一次处理。第14-21行演示了读写循环的核心:我们消费一些记录,启动事务,处理使用的记录,将处理过的记录写入输出主题,将使用的偏移量发送到偏移量主题,最后提交事务.从上面提到的保证中,我们知道偏移量和输出记录将作为一个原子单元提交。事务如何工作在本节中,我们将简要概述上述事务API引入的新组件和新数据流。要对该主题进行更详尽的讨论,您可以阅读原始设计文档,或观看介绍事务的Kafka峰会演讲。以下内容的目标是在调试使用事务的应用程序时或在尝试调整事务以获得更好的性能时提供一个心智模型。事务协调器和事务日志Kafka0.11.0中事务API引入的组件是上图右侧的事务协调器和事务日志。事务协调器是在每个Kafka代理中运行的模块。事务日志是一个内部的kafka主题。每个协调器都拥有事务日志中的一些分区子集。它的代理是它作为领导者的分区。每笔交易。ID通过简单的哈希函数映射到事务日志的特定分区。这意味着只有一个协调员拥有给定的transaction.id。通过这种方式,我们利用Kafka坚如磐石的复制协议和领导者选举过程来确保事务协调器始终可用,并持久存储所有事务状态。值得注意的是,事务日志只存储事务的最新状态,而不是事务中的实际消息。消息仅存储在实际主题分区中。事务可以处于不同的状态,例如“进行中”、“准备提交”和“完成”。正是这种状态和关联的元数据存储在事务日志中。数据流在高层次上,数据流可以分为四种不同的类型。A:当生产者和事务协调器交互执行一个事务时,生产者向事务协调器发送如下请求:initTransactionsAPI注册一个事务。ID和协调员。此时,协调器将使用该事务来关闭任何未决事务。在身份和碰撞的时代,僵尸将被围起来。每个生产者会话只发生一次。当生产者在事务中第一次向分区发送数据时,该分区首先向协调器注册。当应用程序调用commitTransaction或abortTransaction时,将向协调器发送请求以开始两阶段提交协议。B:协调器与事务日志的交互随着事务的进行,生产者发送上述请求更新协调器上事务的状态。事务协调器将它拥有的每个事务的状态保存在内存中,并将该状态写入事务日志(以三种方式复制,因此它是持久的)。事务协调器是唯一从事务日志中读取和写入的组件。如果给定的broker发生故障,将选举一个新的协调器作为已死broker所拥有的事务日志分区的领导者,它将从传入的分区读取消息,以便为这些分区中的事务重建其内存状态。C:生产者向目标主题分区写入数据生产者在事务中向协调器注册新分区后,正常向实际分区发送数据。是同一个制作人。发送流,但进行一些额外的验证以确保生产者不受保护。D:主题分区交互的协调器在生产者发起提交(或中止)后,协调器启动两阶段提交协议。在第一阶段,协调器将其内部状态更新为“prepare_commit”,并在事务日志中更新此状态。一旦完成,事务保证在任何情况下都会提交。然后协调器开始第2阶段,将事务提交标记写入作为事务一部分的主题分区。这些事务标记不会暴露给应用程序,但会被消费者以read_committed模式使用,以过滤掉中止事务中的消息,并且不会返回属于打开事务的一部分的消息(即,在日志中但未与事务标记相关联的消息。一旦标记被写入,事务协调器将事务标记为“完成”,生产者可以开始下一个事务。实践中处理事务现在我们已经了解了事务的语义及其工作方式,然后我们将注意力转向实际操作编写利用事务的应用程序的方面。如何选择transaction.id事务。id在防止僵尸中起着重要作用。但是在不同的生产者会话之间保持标识符一致是的,正确隔离僵尸有点棘手。关键正确隔离“僵尸”是为了确保对于给定的transaction.id,输入主题和分区始终相同跨越读写周期。如果这不是真的,那么一些消息可能会通过事务提供的栅栏泄漏。例如,在分布式流处理应用程序中,假设主题分区tp0最初是由事务处理的。T0编号。如果在稍后的某个时间,它可以映射到另一个具有transactional.idT1的生产者,在T0和T1之间没有栅栏。因此,来自tp0的消息可以被重新处理,这违反了一次处理保证。实际上,输入分区必须存储在外部存储中ids的Mapping之间,或者是它的一些静态编码。KafkaStreams选择了后一种方式来解决这个问题。交易是如何执行的,以及如何调整它们交易生产者的表现让我们把注意力转移到交易是如何执行的。首先,事务只会导致适度的写入放大。增加的写入是由于:对于每个事务,我们有额外的RPC向协调器注册分区。这些是批处理的,因此我们的RPC比交易分区大。当事务完成时,必须向参与事务的每个分区写入事务标记。此外,事务协调器在单个RPC中对绑定到同一代理的所有标记进行批处理,因此我们在那里节省了RPC开销。但是我们无法避免在事务中对每个分区进行一次额外的写入。最后,我们将状态更改写入事务日志。这包括添加到事务中的每批分区的写入、“prepare_commit”状态和“complete_commit”状态。我们可以看到开销与作为事务的一部分写入的消息数无关。因此,提高吞吐量的关键是在每个事务中包含更多的消息。事实上,对于以最大吞吐量生成1KB记录的生产者而言,每100毫秒提交一条消息只会导致吞吐量降低3%。更小的消息或更短的事务提交间隔将导致更严重的降级。增加事务持续时间的主要成本是增加端到端延迟。请记住,阅读交易消息的消费者不会将消息作为公开交易的一部分进行传递。因此,提交之间的时间越长,应用程序等待的时间就越长,从而增加了端到端延迟。事务消费者的性能事务消费者比生产者简单得多,因为它需要做的就是:过滤属于中止事务的消息。不返回作为打开事务的一部分的事务消息。因此,在read_committed模式下读取事务性消息时,事务性消费者的吞吐量不会下降。这样做的主要原因是我们在读取事务性消息时保持零拷贝读取。此外,消费者不需要任何缓冲来等待交易完成。相反,经纪人不允许它进行预补偿,其中包括未平仓交易。因此,消费者非常轻便和高效。有兴趣的读者可以阅读本文档中的消费者设计细节。进一步阅读我们刚刚触及了ApacheKafka中事务的皮毛。幸运的是,几乎所有设计细节都在线记录。相关文档如下:KafkaKIP原版:它提供了数据流的细节和公共接口的概览,尤其是交易附带的配置选项。原始设计文档:不适合胆小的人,这是权威的地方-在源代码之外!-了解每个事务RPC是如何处理的,事务日志是如何维护的,事务数据是如何清除的,等等。KafkaProducerjavadocs:这是学习如何使用新API的好地方。页面开头的示例和发送方法的文档是很好的起点。结论在这篇文章中,我们了解了ApacheKafka中事务API的主要设计目标,理解了事务API的语义,并对API的实际工作方式有了更深入的了解。如果我们考虑一个读-处理-写循环,这篇文章主要讨论读写路径,处理本身就是一个黑盒子。事实上,在处理阶段可以做很多事情,这使得仅使用事务API无法保证一次性处理。例如,如果处理对其他存储系统有副作用,这里提供的API不足以保证只处理一次。KafkaStreams框架使用此处描述的事务性API向上移动价值链,并为各种流处理应用程序提供一次处理,甚至是那些在处理期间更新一些额外状态存储的应用程序。未来的一篇博文将讨论KafkaStreams如何提供exactly-once处理语义,以及如何编写利用它的应用程序。最后,对于那些渴望了解上述API实现细节的人,我们将在另一篇后续博文中介绍一些更有趣的解决方案。
