介绍Debezium是一个开源、低延迟的数据流平台,用于捕获数据变化(CDC)。使用Debezium监控数据库,应用程序可以接收来自数据库的每一行更改事件,只有提交的更改是可见的,因此无需担心事务或更改的回滚。Debezium为所有变更事件提供单一模型,无需担心每个数据库管理系统的复杂性。同时,Debezium将历史数据变化记录在一个持久化的日志中,因此应用程序可以随时停止和重新启动,并且可以在启动后重新接收停止操作期间遗漏的所有事件。监控数据库并在数据更改时收到通知是一件复杂的事情。关系数据库的触发器可能适用,但它们仅限于某些数据库,并且通常仅限于在同一数据库内更新状态并且不与外部进程通信交互。有的数据库提供了监控的API或者框架,但是没有统一的标准,所以每个数据库的方法都不一样,需要大量的专业代码来实现;同时,在监控到数据变更后,如何保证这些变更事件有序进行,减少对数据库的影响,是非常具有挑战性的。Debezium提供模块来执行此操作。有些模块是通用的,可以与多个数据库管理系统一起使用,但在功能和性能方面存在一些限制。其他模块是为特定的数据库管理系统定制的,功能更强大,充分利用了系统的特定功能。项目地址:https://github.com/debezium/debezium基础信息基础设施Debezium使用Kafka和KafkaConnect来实现自身的持久化、可靠性和容错性。KafkaConnect服务中部署的每个连接器都监控一个上游数据库服务器,捕获所有数据库变化,然后将它们记录到一个或多个Kafka主题(通常是一个数据库表对应一个Kafka主题)。Kafka保证所有这些数据变化事件都是多副本的,并且一般是有序的(Kafka只能保证一个主题的单个分区内的顺序),这样更多的客户端可以独立接收到相同的数据变化,对系统造成的影响降到最低(如果N个应用直接监控数据库,数据库压力为N,使用debezium将数据库变化事件上报给Kafka,所有应用接收Kafka,可以将数据库压力降低到1)。另外,客户端可以随时停止接收,然后重启,从上次停止接收的地方继续接收。对于不需要或不想要这种级别的容错、性能、可扩展性和可靠性的应用程序,他们可以使用嵌入式Debezium连接器引擎直接在应用程序内部运行连接器。这种应用程序仍然需要接收数据库更改事件,但更希望连接器直接传递它而不是持久化到Kafka。常见使用场景缓存失效当源被更改或删除时,缓存的内容会立即使缓存中的条目失效。如果缓存运行在单独的进程中(例如Redis、Memcache、Infinispan或其他),那么简单的缓存失效逻辑可以放在单独的进程或服务中,从而简化主应用程序的逻辑。在某些情况下,缓存失效逻辑可能会更复杂一些,让它使用来自更改事件的更新数据更新缓存中受影响的条目。简化单体应用程序许多应用程序更新数据库,然后在数据库中的更改提交后做一些额外的工作:更新搜索索引、更新缓存、发送通知、运行业务逻辑等等。这种情况通常称为双重写入,因为应用程序不会在单个事务中写入多个系统。这样一来,不仅应用逻辑复杂,难以维护,而且双写容易丢失数据,或者导致不同系统间状态不一致,有的系统更新成功,有的系统更新失败。使用CDC,在源数据库中的数据更改被提交后,这些额外的任务可以在单独的线程或进程(服务)中完成。这种实现方式容错性更强,不会丢失事件,易于扩展,更容易支持升级。共享数据库当多个应用程序共享同一个数据库时,一个应用程序提交的更改通常需要被另一个应用程序感知。一种实现方式是使用消息总线,另一种实现方式是Debezium:每个应用程序都可以直接监听数据库的变化并响应变化。数据集成数据通常存储在多个地方,尤其是当数据以不同的形式用于不同的目的时。保持多个系统同步具有挑战性,但可以通过使用Debezium结合简单的事件处理逻辑来实现简单的ETL类型解决方案。命令查询责任分离在命令查询责任分离(CQRS)架构模式中,一个数据模型用于更新数据,一个或多个数据模型用于读取数据。由于数据更改记录在更新端,因此处理这些更改以更新各种读取表示。因此CQRS应用程序通常更复杂,尤其是它们需要保证可靠性和总订单处理。Debezium和CDC使这种方法更加可行:写入操作通常被记录下来,但Debezium捕获数据更改并将它们持久化在总订单流中,然后由需要异步更新只读视图的服务接收。安装和使用Debezium需要三个独立的服务:ZooKeeper、Kafka和Debezium连接器服务。官方推荐使用Docker安装,数据库以MySQL为例。启动ZooKeeper$dockerrun-it--rm--namezookeeper-p2181:2181-p2888:2888-p3888:3888quay.io/debezium/zookeeper:1.9如果使用Podman,运行以下命令:$sudopodmanpodcreate--name=dbz--publish"9092,3306,8083"$sudopodmanrun-it--rm--namezookeeper--poddbzquay.io/debezium/zookeeper:1.9启动后可以看到以下输出:以独立模式启动ZooKeeperJMX默认启用使用配置:/zookeeper/conf/zoo.cfg2017-09-2107:15:55,417-INFO[main:QuorumPeerConfig@134]-从以下位置读取配置:/zookeeper/conf/zoo.cfg2017-09-2107:15:55,419-INFO[main:DatadirCleanupManager@78]-autopurge.snapRetainCount设置为32017-09-2107:15:55,419-INFO[main:DatadirCleanupManager@79]-autopurge.purgeInterval设置为1...端口0.0.0.0/0.0.0.0:2181启动Kafka$dockerrun-it--rm--namekafka-p9092:9092--linkzookeeper:zookeeperquay.io/debezium/kafka:1.9如果使用Podman,请运行以下命令:$sudopodmanrun-it--rm--namekafka--poddbzquay.io/debezium/kafka:1.9启动后可以看到如下输出:...2017-09-2107:16:59,085-INFO[main-EventThread:ZkClient@713]-zookeeper状态已更改(SyncConnected)2017-09-2107:16:59,218-INFO[main:Logging$class@70]-ClusterID=LPtcBFxzRvOzDSXhc6AamA...2017-09-2107:16:59,649-INFO[main:Logging$class@70]-[KafkaServer1],启动了MySQL容器运行一个预先配置了库存数据库的MySQL数据库服务器:$dockerrun-it--rm--namemysql-p3306:3306-eMYSQL_ROOT_PASSWORD=debezium-eMYSQL_USER=mysqluser-eMYSQL_PASSWORD=mysqlpwquay.io/debezium/example-mysql:1.9如果使用Podman,运行以下命令:$sudopodmanrun-it--rm--namemysql--poddbz-eMYSQL_ROOT_PASSWORD=debezium-eMYSQL_USER=mysqluser-eMYSQL_PASSWORD=mysqlpwquay.io/debezium/example-mysql:1.9启动后可以看到如下输出:...[System][MY-010931][Server]/usr/sbin/mysqld:准备连接。版本:'8.0.27'套接字:'/var/run/mysqld/mysqld.sock'端口:3306MySQLCommunityServer-GPL.[System][MY-011323][Server]X插件准备连接。绑定地址:'::'端口:33060,套接字:/var/run/mysqld/mysqlx.sock启动Kafka连接器该服务公开了一个RESTAPI来管理DebeziumMySQL连接器:$dockerrun-it--rm--nameconnect-p8083:8083-eGROUP_ID=1-eCONFIG_STORAGE_TOPIC=my_connect_configs-eOFFSET_STORAGE_TOPIC=my_connect_offsets-eSTATUS_STORAGE_TOPIC=my_connect_statuses--linkkafka:kafka--linkmysql:mysqlquay.io/debezium/connect:1.9如果使用Podman,运行以下命令:$runsuit-podman--rm--nameconnect--poddbz-eGROUP_ID=1-eCONFIG_STORAGE_TOPIC=my_connect_configs-eOFFSET_STORAGE_TOPIC=my_connect_offsets-eSTATUS_STORAGE_TOPIC=my_connect_statusesquay.io/debezium/connect:1.9启动后看到如下输出:...2020-02-0615:48:33,939信息||Kafka版本:3.0.0[org.apache.kafka.common.utils.AppInfoParser]...2020-02-0615:48:34,485信息||[WorkerclientId=connect-1,groupId=1]使用配置偏移量-1启动连接器和任务[org.apache.kafka.connect.runtime.distributed.DistributedHerder]2020-02-0615:48:34,485信息||[WorkerclientId=connectt-1,groupId=1]连接器和任务启动完成[org.apache.kafka.connect.runtime.distributed.DistributedHerder]注册MySQL连接器通过注册DebeziumMySQL连接器,连接器将开始监控MySQL数据库的binlogserver,在数据库的binlog中记录所有的事务(比如对单行的修改)。当数据库中的一行发生更改时,Debezium会生成一个更改事件。配置如下:{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","database.server.name":"dbserver1","database.include.list":"inventory","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"schema-changes.inventory"}}使用curl命令进入注册表:$curl-i-XPOST-H"Accept:application/json"-H"Content-Type:application/json"localhost:8083/connectors/-d'{“名称”:“库存连接器”,“配置”:{“connector.class”:“io.debezium.connector.mysql.MySqlConnector”,“tasks.max”:“1”,“database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","数据库.server.name":"dbserver1","database.ininclude.list":"inventory","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"dbhistory.inventory"}}'更新数据库并查看更改事件使用watch-topic查看dbserver1.inventory.customers主题:$dockerrun-it--rm--namewatcher--linkzookeeper:zookeeper--linkkafka:kafkaquay.io/debezium/kafka:1.9watch-topic-a-kdbserver1.inventory.customers在MySQL客户端上进行数据更改:mysql>UPDATEcustomersSETfirst_name='AnneMarie'WHEREid=1004;QueryOK,1rowaffected(0.05sec)Rowsmatched:1Changed:1Warnings:0检查并修改修改后的值:mysql>SELECT*FROMcustomers;+------+------------+------------+--------------------+|id|first_name|last_name|email|+-----+-----------+------------+------------------------+|1001|莎莉|托马斯|莎莉.thomas@acme.com||1002|George|Bailey|gbailey@foobar.com||1003|Edward|Walker|ed@walker.com||1004|AnneMarie|Kretchmar|annek@noanswer.组织|+------+------------+------------+--------------------+4rowsinset(0.00sec)switchtowatch-topicterminaltowatchevents:通过比较前后结构,可以确定受影响的行中实际发生了什么变化提交{“模式”:{...},“有效载荷”:{“之前”:{“id”:1004,“first_name”:“Anne”,“last_name”:“Kretchmar”,“email”:“annek@noanswer.org"},"after":{"id":1004,"first_name":"AnneMarie","last_name":"Kretchmar","email":"annek@noanswer.org"},"来源”:{“名称”:“1.9.5.Final”,“名称”:“dbserver1”,“server_id”:223344,“ts_sec”:1486501486,“gtid”:null,“文件”:“mysql-bin.000003","pos":364,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"u","ts_ms":1486501486308}}
