当前位置: 首页 > 科技观察

FlinkCDCMongoDBConnector实现原理与实践_0

时间:2023-03-11 23:11:08 科技观察

摘要:本文根据XTransfer高级Java开发工程师、FlinkCDCMaintainer孙家宝在FlinkCDCMeetup上的演讲整理而成。主要内容包括:MongoDBChangeStream技术介绍MongoDBCDCConnector业务实践MongoDBCDCConnector生产调优MongoDBCDCConnector并行化快照改进后续规划01MongoDBChangeStream技术介绍MongoDB是一个面向文档的非关系型数据库,支持半结构化数据存储;也是分布式数据库,提供副本集和分片集两种集群部署模式,具有高可用性和水平扩展能力,更适合大规模数据存储。此外,MongoDB4.0版本还提供了对多文档事务的支持,对于一些比较复杂的业务场景更加友好。MongoDB采用弱结构化存储模型,支持灵活的数据结构和丰富的数据类型,适用于Json文档、标签、快照、地理位置、内容存储等业务场景。其天然的分布式架构提供了开箱即用的分片机制和自动再平衡能力,适用于大规模数据存储。此外,MongoDB还提供了分布式网格文件存储功能,即GridFS,适合存储图片、音频、视频等大文件。MongoDB提供了两种集群模式部署方式:副本集和分片集。副本集:一种高可用的部署方式,从节点通过复制主节点的操作日志来复制数据。当主节点发生故障时,从节点和仲裁节点将重新发起投票选举新的主节点,实现故障转移。此外,从节点还可以分担查询请求,减轻主节点的查询压力。Shardset:一种水平扩展的部署方式,将数据均匀分散在不同的Shard上。每个分片都可以部署为一个副本集。分片中的主节点承载读写请求,从节点会复制主节点的操作日志。根据指定的分片索引和分片策略,将数据分成多个16MB的数据块,将这些数据块交给不同的分片进行存储。Shard和数据块的对应关系会记录在ConfigServers中。MongoDB的Oplog类似于MySQL的Binlog,记录了MongoDB中数据的所有操作日志。Oplog是一个有容量的集合。如果超过预设的容量范围,则之前的信息将被丢弃。与MySQL的Binlog不同的是,Oplog并不记录变化前后的完整信息。遍历Oplog确实可以捕捉到MongoDB的数据变化,但是转换成Flink支持的Changelog还是有一定的局限性。首先,订阅Oplog很困难。每个副本集维护自己的Oplog。对于分片集群,每个分片可能是一个独立的副本集。需要遍历每个shard的Oplog,按照操作时间排序。另外,Oplog不包含变更文档前后的完整状态,因此既不能转化为Flink标准的Changelog,也不能转化为UpsertChangelog。这也是我们在实现MongoDBCDCConnector时没有采用直接订阅Oplog方案的主要原因。最终,我们选择使用MongoDBChangeStreams方案来实现MongoDBCDCConnector。ChangeStreams是MongoDB3.6版本提供的新特性。提供了更简单的变更数据捕获接口,屏蔽了直接遍历Oplog的复杂性。ChangeStreams还提供了变更文档完整状态的抽取功能,可以很方便的转换成FlinkUpsert类型的Changelog。它还提供了比较完善的故障恢复能力,每个变更记录数据都会包含一个resumetoken来记录当前的变更流位置。发生故障后,可以使用resumetoken从当前消费点恢复。此外,ChangeStreams支持对变更事件进行过滤和定制。例如,数据库和集合名称的常规过滤器可以下推到MongoDB来完成,这可以显着减少网络开销。它还提供对集合库和整个集群级别的变更订阅,并可以支持相应的权限控制。使用MongoDBChangeStreams功能实现的CDC连接器如上图所示。首先通过ChangeStreams订阅MongoDB的变化。例如,有四种类型的更改:插入、更新、删除和替换。先将其转化为Flink支持的upsertChangelog,然后在其之上定义动态表,使用FlinkSQL进行处理。目前MongoDBCDCConnector支持Exactly-Once语义,支持全加增量订阅,支持从checkpoints和savepoints恢复,支持Snapshot数据的过滤,支持Database和Collection等元数据的提取,支持库的集合正则过滤功能.02MongoDBCDCConnector业务实践XTransfer成立于2017年,专注于B2B跨境支付业务,为从事跨境电商出口的中小微企业提供外贸收款和风控服务。跨境B类业务结算场景涉及的业务环节很长。从询价到最终成交,涉及物流条款、支付条款等,每个环节都需要做好风险控制,以符合跨境资金交易的要求。监管要求。以上种种因素都对XTransfer数据处理的安全性和准确性提出了更高的要求。在此基础上,XTransfer基于Flink构建了自己的大数据平台,可以有效保证跨境B2B全链路上的数据得到有效的采集、处理和计算,满足高安全、低延迟、高精准度。要求。变更数据捕获CDC是数据集成的关键部分。在使用FlinkCDC之前,一般使用Debezium、Canal等传统CDC工具将数据库的changelog提取出来转发给Kafka,下游读取Kafka中的changelog进行消费。该架构存在以下痛点:部署组件多,运维成本高;下游的数据消费逻辑需要根据写入端进行适配,有一定的开发成本;数据订阅配置复杂,不能像FlinkCDC语句那样只通过SQL来使用,定义了完整的数据同步逻辑;难以完全满足全量+增量采集,可能需要引入DataX等全量采集组件;更偏向于变化数据的收集,对数据的处理和过滤能力相对较弱;很难满足结构化数据源拓宽的不同场景。目前我们大数据平台主要采用FlinkCDC抓取变更数据,具有以下优势:1.实时数据集成不需要额外部署Debezium、Canal、Datax等组件,大大降低运维成本;支持丰富的数据源,可以复用Flink已有的connector进行数据采集和写入,可以覆盖大部分业务场景;降低开发难度,仅通过FlinkSQL即可定义完整的数据集成工作流;数据处理能力强,依托Flink平台强大的计算能力,可以实现异构数据源的流式ETL甚至join、groupby。2、构建实时数仓大大简化了实时数仓的部署。FlinkCDC实时采集数据库变化,写入Kafka、Iceberg、Hudi、TiDB等数据库,再利用Flink进行深度数据挖掘和数据处理。.Flink的计算引擎可以支持流批一体的计算模式,不再需要维护多套计算引擎,可以大大降低数据开发的成本。3.实时风控以往实时风控一般是通过将业务事件发送给Kafka来实现的。使用FlinkCDC后,可以直接从业务库中抓取风控事件,再通过FlinkCDC进行复杂的事件处理。可以通过FlinkML和Alink运行模型,丰富机器学习能力。最后将这些实时风控的处置结果落入Kafka,下发风控命令。03MongoDBCDCConnector生产调优使用MongoDBCDCConnector有如下要求:由于使用ChangeStreams特性实现MongoDBCDCConnector,要求MongoDB最低可用版本为3.6,4.0.8及以上版本为受到推崇的。必须使用集群部署方式。由于订阅MongoDB的ChangeStreams需要节点之间相互复制数据,单个MongoDB无法相互复制数据,也就没有Oplog。只有副本集或分片集才有数据复制机制。需要使用WireTiger存储引擎,使用pv1复制协议。需要ChangeStream和查找用户权限。使用MongoDBCDCConnector时,注意设置Oplog的容量和过期时间。MongoDBoplog是一个特殊的容量集合。容量达到最大值后,历史数据将被丢弃。然而,ChangeStreams是通过恢复令牌恢复的。oplog容量过小可能导致resumetoken对应的oplog记录不存在,即resumetoken过期,ChangeStreams无法恢复。可以通过replSetResizeOplog设置oplog容量和最短保留时间,MongoDB4.4及以后版本也支持设置最短时间。一般来说,建议在生产环境中保留oplog不少于7天。对于变化缓慢的表,建议在配置中开启心跳事件。change事件和heartbeat事件可以同时向前推送resumetoken。对于变化缓慢的表,可以通过心跳事件刷新resumetoken,避免其过期。可以通过heartbeat.interval.ms设置心跳间隔。由于MongoDB的ChangeStreams只能转化为Flink的Upsertchangelog,与UpsertKafka类似,因此会增加一个算子ChangelogNormalize来完成-U原像值,这会带来额外的状态开销。所以生产环境推荐使用RocksDBStateBackend。当默认连接的参数不能满足使用需求时,可以通过设置connection.options配置项来传递MongoDB支持的连接参数。比如连接MongoDB的用户创建的数据库不在admin中,可以设置参数指定需要使用哪个数据库对当前用户进行认证,也可以设置连接池的最大连接数参数。MongoDB的连接字符串默认支持这些参数。多库多表正则匹配是MongoDBCDCConnector2.0版本后提供的新特性。需要注意的是,如果数据库名使用正则参数,需要有readAnyDatabase角色。因为MongoDB的ChangeStreams只能在整个集群、数据库、集合的粒度上启用。如果需要过滤整个数据库,只能在对数据库进行正则匹配时,在整个集群上开启ChangeStreams,然后通过Pipeline过滤数据库的变更。通过在Ddatabase和Collection这两个参数中写正则表达式可以订阅多个数据库和表。04MongoDBCDCConnectorParallelSnapshotImprovement为了加快Snapshot的速度,可以使用Flip-27引入的源码进行并行改造。首先使用一个splitenumerator将一个完整的Snapshot任务按照一定的切分策略拆分成若干个子任务,然后分配给多个splitreader并行执行Snapshots,从而提高整体任务的运行速度。但是在MongoDB中,大多数情况下,组件是ObjectID,其中前四个字节是UNIX描述,中间五个字节是一个随机值,最后三个字节是一个自增。同一描述中插入的文档不是严格递增的,中间的随机值可能会影响局部严格递增,但总体来说递增的趋势还是可以满足的。因此,与MySQL的增量组件不同,MongoDB不适合使用offset+limit的拆分策略对其集合进行简单的拆分,需要针对ObjectID有针对性的拆分策略。最终,我们采用了以下三种MongoDB切分策略:Samplesamplingbucketing:原理是使用$sample命令对集合进行随机抽样,估计平均文档大小和每个chunk的大小需要的桶数.需要对应集合的查询权限。优点是速度快,适用于数据量大但不分片的集合。缺点是由于采用抽样估计方式,分桶的结果不能绝对统一。SplitVector索引拆分:SplitVector是MongoDB内部计算chunk拆分点的命令,通过访问指定的索引来计算每个chunk的边界。需要SplitVector权限,具有速度快,chunk结果统一的优点;缺点是对于数据量大且已经分片的collection,最好直接读取configlibrary中已经分片的chunk的metadata。Chunks元数据读取:由于MongoDB将shardedcollection的实际分片结果存储在config数据库中,因此可以直接从config中读取shardedcollection的实际分片结果。需要对配置库的读取权限,并且仅适用于分片集合。优点是速度快,不需要重新计算chunk分割点,chunk结果统一,默认为64MB;缺点是不能满足所有场景,只限于碎片化场景。上图是样本采样分桶的例子。左边是一个完整的集合。设置完整集合的样本个数,然后收缩整个样本,根据采样样本进行分桶。最后的结果就是我们想要的chunksboundary。sample命令是MongoDB采样的内置命令。在样本值小于5%的情况下,采用伪随机算法进行抽样;在样本值大于5%的情况下,先使用随机排序,然后选择前N个文档。其均匀性和耗时主要取决于随机算法和样本数。它是均匀性和分割速度之间的折衷策略。适用于要求分割速度快但又能容忍分割结果不均匀的场景。.在实际测试中,样品采样的均匀性有很好的表现。上图是SplitVector索引拆分的一个例子。左边是原始集合,要访问的索引由SplitVector命令指定,也就是ID索引。可以设置每个chunk的大小,单位MB,然后使用SplitVector命令访问索引,通过索引计算每个chunk的边界。速度快,分块结果非常均匀,适用于大部分场景。上图是读取config.chuncks的例子,即直接读取MongoDB划分好的chunks的元数据。每个分片、它的机器以及每个分片的边界都存储在配置服务器中。对于分片集合,可以直接分块读取它的边界信息,不需要重复计算这些分裂点,也可以保证每个分块的读取都可以在单机上完成,速度极快,而且在大规模在分片收集场景下有很好的表现。05后续规划FlinkCDC的后续规划主要分为以下五个方面:第一,协助完善FlinkCDC的增量Snapshot框架;第二,使用MongoDBCDC对接FlinkCDC的增量Snapshot框架,支持并行Snapshot改进;第三,MongoDBCDC支持FlinkRawType。对于一些更灵活的存储结构,提供了RawType转换,用户可以自定义解析为UDF形式;第四,MongoDBCDC支持从指定位置收集变化的数据;第五,优化了MongoDBCDC的稳定性。问与答Q1:MongoDBCDC是否有高延迟?您是否需要牺牲性能来减少延迟?MongoDBCDC的延迟并不高。在全量收集过程中通过changelognormalize可能会对CDC的增量收集造成一些背压。但是,可以通过对MongoDB进行并行改造,增加资源的方式来避免这种情况。Q2:默认连接什么时候不满足要求?可以在任何数据库、任何分库中创建MongoDB用户。如果不在admin数据库中创建用户,则需要在认证时明确指定在哪个数据库中对用户进行认证,也可以设置最大连接大小等参数。Q3:MongoDB目前的DBlog是否支持无锁并发读?DBlog的无锁并发具有增量快照的能力,但由于MongoDB难以获取当前changelog的位置,因此无法立即实现增量快照,但很快就会支持无锁并发Snapshots。