当前位置: 首页 > 网络应用技术

Debezium Engine使用进入

时间:2023-03-06 14:46:18 网络应用技术

  Debezium连接器通常将其部署到Kafka连接服务并配置一个或多个连接器以监视上游数据库。这些连接器将捕获上游数据库的所有更改以生成数据更改事件。这些数据更改事件写入KAFKA,可以在其中通过许多不同的应用程序独立使用它们。KafkaConnect提供出色的容错和可扩展性,因为它作为分布式运行服务并确保所有已注册和配置的连接器都在运行。例如,即使kafka连接端点在群集中失败,其余的kafka Connect端点也将运行在端点之前终止的端点上运行的任何连接器,从而最大程度地减少停止时间并消除管理活动。

  并非每个应用程序都需要该级别的容错性和可靠性。他们可能不想依靠外部Kafka代理群集和Kafka Connect Services。为了让连接器将它们直接发送到应用程序,而不是在Kafka中耐用。

  该模块定义了一个小的API,该API允许应用程序轻松配置并运行Debezium Engine的Debezium Connector。

  要使用Debezium Engine模块,请将模块添加到应用程序的依赖项中。在模块中实现了API,还应将其添加到依赖项中。对于Maven,这需要将以下内容添加到应用程序的应用程序中:

  其中包括您正在使用的Debezium版本,或其值包含Debezium版本字符串的Maven属性。

  同样,将依赖关系添加到应用程序使用的每个Debezium连接器中。

  或用于MongoDB连接器:

  本文档的其余部分描述了您应用程序中嵌入的MySQL连接器。外观连接器的配置,主题和事件,其他连接器的使用相似。

  您的应用程序需要为您要运行的每个连接器实例设置嵌入式引擎。该类用作任何Debezium连接器的易于使用的拳击手,并完全管理连接器的生命周期。您将其构建器API用于创建示例以提供以下内容:

  这是配置代码并运行嵌入式MySQL连接器的示例:

  从几行开始,让我们仔细研究一下此代码:

  这将创建一个新对象来设置引擎要求的多个字段。第一个是引擎的名称,该名称将在连接器及其内部状态生成的源记录中使用。该字段定义了类的名称Kafka连接抽象类;在此示例中,我们指定了Debezium类。

  当Kafka Connecton连接器运行时,它将从源数据库中读取信息并定期记录“偏移”。这些“偏移”定义了它已经处理了多少信息。如果连接器重新启动,它将使用最终记录的偏移来了解其应继续从源信息中读取的位置。由于连接器不知道并且不知道关心偏移量的存储方式,引擎需要提供一种存储和还原这些偏移的方法。我们配置的下几个字段指定我们的引擎应使用存储在本地文件系统上的偏移的文件(此文件可以任意命名并在任何地方存储)。此外,尽管连接器使用每个源的每个源来记录偏移量,但引擎会定期刷新偏移到储备存储(在我们的示例中,一分钟一次)。这些字段可以自定义根据您的申请的需求。

  接下来的几行定义了特定于连接器的字段。在我们的示例中,连接器:

  在这里,我们设置了运行MySQL数据库服务器的主机名和端口号,并定义用于连接MySQL数据库的用户名和密码。请注意,对于MySQL,用户名和密码应与已授予的MySQL数据库用户相对应以下MySQL权限:

  读取数据库的一致快照需要前三个权限。最后两个权限允许数据库读取BINLOG,通常用于由MySQL复制的服务器。

  此配置还包括。由于MySQL的Binlog是MySQL复制机制的一部分,以便读取Binlog,因此必须将实例添加到MySQL Server组中,这意味着在我们的代码中,这意味着整数1和2。但是有点随机的值,我们只会使用我们的应用程序。

  配置还指定了MySQL Server的逻辑名称。连接器在其记录的每个源的主题字段中包含此逻辑名称,使您的应用程序能够识别这些记录的源。您使用“产品”的服务名称。,可能是因为数据库包含产品信息。当然,您可以将其命名为应用程序。

  此类运行时,它会读取MySQL Server的Binlog,包括服务器数据库和模式更改的所有数据更改。由于记录更改时,数据的所有更改都是基于表格的所有更改,因此连接器需要跟踪所有架构更改,以便它可以正确解码更改事件。连接记录模式信息是为重新启动连接器并继续从上一记录的偏移而继续阅读的。它完全知道偏移量的数据库模式的状态。如何记录数据库模式的历史记录在我们配置的最后两个字段中定义本地文件系统存储在任何地方)。

  最后,使用此方法来构建配置。(顺便说一句,我们可以使用其中一种方法来读取属性文件的配置,而不是通过编程构建配置。)

  现在我们已经有了配置,我们可以创建引擎。在这里,再次是相关的代码行:

  所有更改都将传递给给定的处理方法。此方法必须匹配功能接口的签名,该函数接口的签名必须与指定格式的类型匹配。请注意,请注意,应用程序的处理功能不应抛出任何例外;如果是这样,则引擎将记录任何抛出该方法的异常情况,并将继续操作下一个源记录,但是您的应用程序将没有应用程序。处理引起异常的特定来源的机会,这意味着您的应用程序可能会与该应用程序不一致数据库。

  目前,我们有一个已配置和准备运行的现有对象,但没有执行任何操作。设计为异步执行:

  您的应用程序可以通过调用其方法安全,优雅地阻止引擎:

  或者因为引擎支持接口,所以它在离开块时会自动调用。

  引擎连接器将停止从源系统读取信息,将所有剩余的更改转发到您的处理功能,并将最新办公室刷新到偏移存储。为了使引擎在退出之前停止,您可以使用和方法执行此操作:

  另外,您可以在创建时注册为回调,以在引擎终止时获取通知。

  回想一下,当JVM关闭时,它只会等待监护人线程。因此,如果您的应用程序退出,请务必等待引擎完成,或在Guardian线程上运行引擎。

  您的应用程序应始终正确停止引擎以确保正常和完全关闭,并且每个源记录都会准确发送到应用程序。当中断时,发动机可能不会完全终止,并且当您的应用程序重新启动时,可能会看到一些源记录在关闭之前已处理过。

  您可以接受多个不同的参数,这将影响消费者接收消息格式。允许的值是:

  在内部,引擎使用适当的Kafka Connecter来实现委托转换。您可以使用引擎属性来执行转换器的参数化以修改其行为。

  输出格式的一个示例是

  数据类型是键/值对。

  在将消息传递到处理程序之前,您可以通过Kafka Connect的简单消息转换(SMT)管道运行它们。EAFESSMT可以通过,修改或过滤消息。该链条使用适当的链条配置。此属性包含的逗号分隔列表要应用的逻辑名称。该属性定义了每个转换的实现类的名称,以及传递给转换的配置选项。

  配置的一个示例是

  在某些情况下,例如尝试在批处理或异步API中编写记录,上述功能接口可能具有挑战性。在这些情况下,使用此接口可能会更容易。

  该接口具有一个单个功能,并具有以下签名:

  如Javadoc中所述,每个记录都会调用对象,并且在每个批次完成后都调用对象。接口是安全的,并且允许灵活的处理记录。

  您可以选择覆盖处理记录的偏移。这是通过首先调用和构建新对象,使用更新偏移,然后使用更新的调用来完成的。

  要使用API,您必须将接口的实现传递给API,如下所示:

  如果使用JSON格式(其他格式的等效格式),则该代码如下:

  除非有默认值可用,否则需要以下配置属性(对于兼容的文本格式,更换Java类的包名称)。

  连接器实例的唯一名称。

  连接器的Java类的名称,例如MySQL Connector.Respons,用于移动连接的Java类的名称。它必须实现interface.t indumence.s indumencement.it apriplacement.set。偏移量的Kafka主题的名称是存储。设置在需要时进行。在偏移存储主题时创建使用的分区数。设置需要时。在偏移存储主题时创建复制因子。。时间间隔。存储。默认值为5秒。应将转换器类用于序列化和deepertine位移的关键数据。默认JSON转换器。应用于序列化和衍生化偏移数据的转换器类。默认JSON转换器。有些连接器还需要一组附加属性来配置数据库的历史记录:

  如果数据库历史记录没有正确的配置,则连接器将拒绝启动。默认配置需要KAFKA群集可用。对于其他部署,可以使用基于数据库的数据库数据库来实现。使用文本的格式,Java类的软件包名称被取代)

  负责Java类的数据库历史名称。

  它必须实现接口。数据库历史记录记录的文档的路径。

  kafka存储数据库历史的主题。

  将要连接的KAFKA群集服务器的初始列表。集群提供存储数据库历史的主题。

  执行引擎后,连接器将在每个源记录中积极记录位移,并且发动机会定期将这些偏移刷到持久存储中。当应用程序和引擎正常关闭或正常折叠时,或者当它们被正常重新启动,重新启动,重新启动,重新启动,发动机及其连接器将从上一记录的偏移范围恢复源信息。

  那么,当您的应用程序失败时,当您的应用程序失败时会发生什么?最终结果是,该应用程序可能会在重新启动后倒闭之前已处理过一些源记录。金额取决于引擎,这些引擎会刷新频率的偏移量其存储(通过属性)和在一批特定连接器中返回的源记录的数量。最好的情况是每次刷新偏移量(例如,设置为0),但即使如此,嵌入式引擎仍然会仅在接收到连接器的每个源记录后才刷新偏移量。

  例如,MySQL连接器使用规格是批处理处理中出现的最大源记录。即使将其设置为0,当应用程序崩溃后重新启动应用程序时,最多可能会看到n个重复,其中n为n是批处理的大小。如果设置属性更高,则应用程序可能会看到最大重复项,其中n是批次的最大大小,m是单个偏移刷新间隔期间可能积累的批次数。(显然,嵌入式连接器可以配置为不使用批处理处理并始终刷新偏移,这将导致应用程序永远不会收到任何重复的源记录。但是,这将大大增加开销并减少连接器吞吐量的吞吐量。数量。)

  最重要的是,当使用嵌入式连接器时,该应用程序将在正常操作期间仅接收每一个源记录(包括正常关闭后的正常关闭),但是确实需要在倒塌或闭合不正确后立即容忍它重组后立即关闭。重新培养事件。如果应用程序需要更严格的准确行为,则应使用一个完整的Debezium平台,该平台可以提供准确的保证(即使崩溃和重新启动后)。

  单击以查看原始文本:Debezium Engine

  原始:https://juejin.cn/post/7096404178519621640