当前位置: 首页 > 科技观察

实时监控同步数据库变化,这个框架真是神器

时间:2023-03-21 00:51:42 科技观察

我们数据库中的数据一直在变化。有时我们希望监控数据库数据的变化,并根据变化做出一些反应,比如更新相应变化数据的缓存和增量,同步到其他数据源,instrument和audit数据等等。而这种技术叫做变更数据捕获(ChangeDataCapture)。对于这个技术,大家可能知道Canal这个国内比较知名的框架,非常好用!但是Canal有一个局限性,它只能用于Mysql的变更数据抓取。今天给大家介绍另一个更强大的分布式CDC框架Debezium。提到Debezium的框架,相信大多数普通开发者都比较陌生,但是说到它所属的公司,想必大家就耳熟能详了。红帽无疑是开源世界中最成功的红帽公司。Debezium是一个用于捕获数据变化的流处理框架,开源且免费。Debezium近乎实时地监控数据库行级的数据变化并做出反应。而且只有提交的更改是可见的,所以不用担心事务问题或更改被回滚。Debezium为所有的数据库变更事件提供了一个统一的模型,所以不用担心每个数据库系统的复杂性。Debezium提供对MongoDB、MySQL、PostgreSQL、SQLServer、Oracle、DB2和其他数据库的支持。另外,借助KafkaConnector,可以开发基于事件流的变更捕获平台,容错性高,可扩展性强。DebeziumKafka架构如图所示。DebeziumKafkaconnectorforMySQL和PostgresSQL被部署来捕获这两种类型数据库上的变化事件,然后这些变化通过下游的KafkaConnector(例如Elasticsearch、数据仓库、分析系统)或缓存传输到其他系统或数据库。另一种玩法是将Debezium构建到应用程序中,以制作类似于消息总线的设施,并将数据更改事件传递给订阅的下游系统。Debezium内置的服务器架构Debezium在数据完整性和可用性方面也做了很多工作。Debezium使用带有副本备份的持久日志来记录数据库数据更改的历史。因此,您的应用程序可以随时停止和重新启动,而不会丢失停止运行时发生的事件,确保所有事件都能得到处理。正确、彻底地处理。稍后我将演示一个SpringBoot集成的Debezium数据捕获系统。SpringBoot集成Debezium理论的介绍并不能让大家直观感受Debezium的能力,所以接下来我将使用嵌入式Debezium引擎来进行演示。流程图如上图所示。当我们更改MySQL数据库中的某一行数据时,可以通过Debezium实时监控binlog日志的变化,触发对更改事件的捕获,进而获取更改事件模型并做出响应(消费).接下来我们来搭建环境。MySQL打开binlog日志。这里为了方便使用MySQLDocker容器,对应的脚本为:#运行mysql容器dockerrun--namemysql-service-vd:/mysql/data:/var/lib/mysql-p3306:3306-eTZ=Asia/Shanghai-eMYSQL_ROOT_PASSWORD=123456-dmysql:5.7--character-set-server=utf8mb4--collat??ion-server=utf8mb4_unicode_ci--default-time_zone="+8:00"#设置binlog位置dockerexecmysql-servicebash-c"echo'log-bin=/var/lib/mysql/mysql-bin'>>/etc/mysql/mysql.conf.d/mysqld.cnf"#配置mysql的server-iddockerexecmysql-servicebash-c"echo'server-id=123454'>>/etc/mysql/mysql.conf.d/mysqld.cnf"上面的脚本运行一个MySQL容器,用户名为root,密码为123456,并将数据挂载到本地路径d:/mysql/data,并在同时打开binlog日志,设置server-id为123454,后面的配置会用到。请注意,如果不使用root用户,需要保证该用户有四个权限:SELECT、RELOAD、SHOWDATABASES、REPLICATIONSLAVE、REPLICATIONCLIENT。SpringBoot集成嵌入式DebeziumDebezium依赖SpringBoot为应用添加如下依赖:}io.debeziumdebezium-embedded${debezium.version}}dependency>io.debeziumdebezium-connector-mysql${debezium.version}最新版本号为1.5.2。最终的。声明配置然后声明需要的配置:/***Debeziumconfiguration.**@returnconfiguration*/@Beanio.debezium.config.ConfigurationdebeziumConfig(){returnio.debezium.config.Configuration.create()//ConnectorJava类name.with("connector.class",MySqlConnector.class.getName())//偏移持久化,用于容错默认值.with("offset.storage","org.apache.kafka.connect.storage.FileOffsetBackingStore")//offset持久化文件路径default/tmp/offsets.dat如果路径配置不正确可能会导致存储offsets失败导致重复消费变化//如果connector重启,会使用上次的记录的偏移量,以了解它应该在源信息中的何处继续读取。.with("offset.storage.file.filename","C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/offsets.dat")//捕获offsets的循环.with("offset.flush.interval.ms","6000")//连接器的唯一名称.with("name","mysql-connector")//数据库的主机名.with("database.hostname","localhost")//port.with("database.port","3306")//username.with("database.user","root")//password.with("database.password","123456")//includeddatabaselist.with("database.include.list","etl")//是否包含数据库表结构级别的变化,建议使用默认值true.with("include.schema.changes","false")//mysql.cnf配置的server-id.with("database.server.id","123454")//MySQL服务器或cluster的逻辑名.with("database.server.name","customer-mysql-db-server")//历史变化记录.with("database.history","io.debezium.relational.history.FileDatabaseHistory")//历史变化记录存放位置.with("database.历史ory.file.filename","C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/dbhistory.dat").build();}配置分为两部分:一部分是配置属性DebeziumEngine的实现,参见DebeziumEngine配置[1]部分是MysqlConnector的配置属性,参见MysqlConnector配置[2]。DebeziumEngine应用的实例化需要为正在运行的MysqlConnector启动一个Debezium引擎,这个引擎将运行在异步线程的形式,它包裹了整个MysqlConnector连接器的生命周期。声明一个引擎需要以下步骤:声明接收数据变化捕获信息的格式,提供JSON、Avro、Protobuf、Connect、CloudEvents等格式,etc.加载上面定义的配置.声明一个消费数据变化事件的函数方法.声明的伪代码:DebeziumEngine>debeziumEngine=DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(configuration.asProperties()).notifying(this::handlePayload).build();handlePayload的方法是:privatevoidhandlePayload(List>recordChangeEvents,DebeziumEngine.RecordCommitter>recordCommitter){recordChangeEvents.forEach(r->{SourceRecorduecRecord=r.recordValt();StructsourceR();if(sourceRecordChangeValue!=null){//判断操作类型过滤掉读只处理增删改查这个其实可以在配置中设置Envelope.Operationoperation=Envelope.Operation.forCode((String)sourceRecordChangeValue.get(OPERATION));if(operation!=Envelope.Operation.READ){Stringrecord=operation==Envelope.Operation.DELETE?BEFORE:AFTER;//获取增删改查对应的结构体数据Structstruct=(Struct)sourceRecordChangeValue.get(record);//将变化的行封装为MapMappayload=struct.schema().fields().stream().map(Field::name).filter(fieldName->struct.get(fieldName)!=null).map(fieldName->Pair.of(fieldName,struct.get(fieldName))).collect(toMap(Pair::getKey,Pair::getValue));//这里简单打印System.out.println("payload="+payload);}}});}引擎的启动和关闭正好适合SpringBean生命周期:@DatapublicclassDebeziumServerBootstrapimplementsInitializingBean,SmartLifecycle{privatefinalExecutorexecutor=Executors.newSingleThreadExecutor();privateDebeziumEnginedebeziumEngine;@Overridepublicvoidstart(){executor.execute(debeziumEngine);}@SneakyThrowbezidstopium@Override()}OverridepublicbooleanisRunning(){returnfalse;}@OverridepublicvoidafterPropertiesSet()throwsException{Assert.notNull(debeziumEngine,"debeziumEnginemustnotbenull");}}启动SpringBoot项目,可以通过各种方式对数据库进行增删改查数据。观察结果会类似如下打印输出:payload={user_id=1123213,username=felord.cn,age=11,gender=0,enabled=1}表示Debezium已经监听到了数据库的变化。大家可以想想这个技术有什么用的场景。好了,今天的分享就到这里,感谢大家的支持,我是:码农大哥。原创不易,敬请关注,点赞,转发,再看。参考资料[1]DebeziumEngine配置:https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties[2]MysqlConnector配置:https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties本文转载自微信公众号“码农小胖哥”,可通过以下二维码关注。转载本文请联系码农小胖公众号。