一、CDC简介首先,什么是CDC?CDC的全称是ChangedataCapture,即变更数据捕获。是数据库领域非常常见的技术。主要用于捕获数据库中的一些变化,然后将变化的数据发送到下游。它的应用范围很广。它可以做一些数据同步,数据分发和数据收集,也可以做ETL。今天主要分享的是通过CDC将ETLDB数据到数据湖。对于CDC,业界主要有两种:一种是基于查询的,客户端会通过SQL查询源数据库表的变化数据,然后对外发送。二是基于日志,这也是业界广泛使用的一种方法。一般通过binlog的方式,将变化的记录写入binlog。binlog解析后写入消息系统,或者直接基于FlinkCDC进行处理。两者之间是有区别的。query-based相对简单,有侵入性,而log-based是非侵入性的,对数据源没有影响,但是binlog的分析比较复杂。基于查询和基于日志,有四种实现技术,包括基于时间戳、基于触发器、基于快照和基于日志。这是实现CDC的技术。下面是几种方法的比较。通过这张表的对比,可以发现基于日志的综合是最好的,但是分析起来比较复杂。不过业界开源的binlog解析器很多,比较通用和流行的有Debezium、Canal、Maxwell。基于这些binlog解析器,可以构建ETL管道。下面我们来看一个业界比较流行的CDC仓储架构。整个数据存储分为实时流和离线流。实时流解析binlog,通过Canal解析binlog,然后写入到Kafka,然后每小时将Kafka数据同步到Hive;另一个是离线流,需要同步从Hive源层的表中拉取全量。如果只有之前实时流的数据不完整,必须通过离线流的SQLSelect一次导入全量数据。对于每个ODS表,存量数据和增量数据量做一个Merge。这里可以看出ODS层的实时性不够,有几小时或者几天的延迟。对于ODS层,这个延迟可以通过引入ApacheHudi来分分钟实现。2、CDC数据入湖方法以CDC数据入湖为准。这个架构非常简单。各种上游数据源,如DB变更数据、事件流、各种外部数据源,都可以通过变更流写入表中,然后进行外部查询和分析。整个架构非常简单。虽然架构简单,但它仍然面临着许多挑战。以ApacheHudi数据湖为例。数据湖通过文件存储各种数据。对于CDC数据处理,需要对湖中的某些文件进行可靠的、事务性的修改,这样下游查询就看不到部分结果,另外CDC数据需要高效的更新和删除,这就需要快速定位更改的文件。另外,对于每一个小批量的数据写入,希望能够自动处理小文件,避免复杂的小文件处理,以及面向查询的布局优化,可以通过一些技术手段,比如Clustering来改善文件布局以在外部提供更好的查询性能。ApacheHudi如何应对这些挑战?首先它支持事务性的写,包??括读写之间的MVCC机制保证写不影响读,还可以控制事务和并发保证。对于并发写入,使用了OCC乐观锁机制。对于更新删除,一些内置索引和自定义保证更新,删除更高效。另外,为了查询优化,Hudi会在内部自动管理小文件,文件会自动增长到用户指定的文件大小,比如128M,这也是Hudi的一个核心特性。此外,Hudi还提供了Clustering的功能来优化文件布局。下图是一个典型的CDC链接到湖。以上链接是大多数公司采用的链接。之前的CDC数据先通过CDC工具导入到Kafka或者Pulsar,再通过Flink或者Spark流式消费写入Hudi。第二种架构是通过FlinkCDC直接连接MySQL上游数据源,直接写入下游Hudi表。其实这两个环节各有优缺点。第一环节是统一的数据总线,具有良好的扩展性和容错性。对于第二个链路,扩展性和容错性稍差,但由于组件较少,维护成本也相应较低。这是阿里云数据库OLAP团队的CDC湖链接。因为我们是Spark团队,所以我们使用SparkStreaming链接入湖。整个入湖链路也分为两部分:首先是全量同步作业,会通过Spark拉取全量数据。这里如果有从库,可以直接连接从库进行一次全量同步,避免对主库造成影响,然后写信给Hudi。然后会启动一个增量作业,增量作业会通过Spark消费阿里云DTS中的binlog数据,将binlog实时同步到Hudi表中。全量和增量作业的编排利用Lakehouse的自动作业编排能力来协调全量和增量作业,并使用Hudi的Upsert语义来保证全量和增量作业连接时全量增量数据的最终一致性。数据太多和太少的问题。我们团队也针对Lakehouse的CDC入站链路做了一些优化。第一个是原库的Schema变更处理,我们对接的客户对一些列进行增删改的场景。Spark在写入Hudi之前,会检查schema,看schema是否合法。如果合法,则可以正常写入。不合法则写入失败,删除该字段会导致schema校验无效,导致job失败。失败,所以稳定性得不到保证。因此,我们将捕获模式验证的异常。如果发现字段减少,我们会自动补全之前的字段,然后重试,确保链接稳定。第二,有些客户表没有主键或者主键不合理,比如使用更新时间字段作为主键,或者设置了一个会变化的分区字段。此时写入Hudi的数据将与源数据库表中的数据不匹配。因此,我们在产品层面做了一些优化,让用户合理设置主键和分区映射,保证同步到Hudi的数据和源数据库完全对齐。另一个常见的需求是用户在上游库中添加一张表。如果采用表级同步,则新添加的表在整个链路中是感知不到的,无法同步到Hudi。在Lakehouse中,我们可以对整个数据库进行同步,所以当数据库中新增一张表时,会自动感知新表,并自动将新增表的数据同步到Hudi,让原有的数据库可以自动感知添加的表。还有一个就是优化CDC写入的性能,比如拉取一批数据包括Insert、Update、Delete等事件。你是不是一直用Hudi的Upsert方式来写?这个控件比较简单,Upsert有去重数据的能力,但是带来的问题是查找索引的效率低下。对于Insert方法,不需要找索引,效率比较高。所以对于每一批数据,都会判断是否是Insert事件。如果是Insert事件,则直接以Insert方式写入,避免查找文件是否更新的开销。数据显示性能可以提升30%到50%。当然这里也需要考虑DTS异常。重新消费数据时,恢复时不能直接使用Insert方法,否则可能会出现数据重复。针对这个问题,我们引入了表级Watermark,保证即使在DTS异常的情况下,也不会出现数据重复的问题。3.Hudi核心设计接下来介绍Hudi的定位。根据社区最新愿景,Hudi被定义为一个流式数据湖平台,支持海量数据更新,内置表格式并支持事务存储,以及一系列列表服务Clean、Archive、Compaction、Clustering等.,以及开箱即用的数据服务,以及自带的运维工具和指标监控,提供了非常好的运维能力。这是Hudi官网的图片。可以看到,Hudi在整个生态中都是用来做湖泊蓄水的。底层可以对接各种云厂商的HDFS和对象存储,只要兼容Hadoop协议即可。上游是变更事件流入湖,可以支持各种数据引擎,如presto、Spark、云产品;此外,Hudi的增量拉取能力可以用于与Spark、Hive和Flink构建派生表。整个Hudi架构非常完整,定位是增量处理栈。典型的流式处理是面向行的,逐行处理数据,处理效率很高。但是没有办法对面向行的数据做大规模的分析和扫描优化,批处理可能需要每天全量处理一次,效率比较低。Hudi引入了增量处理的概念。处理后的数据是在某个时间点之后。它类似于流处理,但比批处理效率更高。而且面向数据湖中的列存数据,扫描优化非常高效。并回顾胡地的发展历程。2015年社区主席发表增量处理文章,2016年在Uber投产,为所有关键数据库服务提供支持;2017年,Uber支撑了100PB的数据湖,2018年凭借云计算的普及,吸引了国内外用户;Uber在2019年将其捐赠给Apache进行孵化;2020年大约一年成为Top项目,采用率增长10倍以上;2021年来自Uber的最新信息显示,Hudi支持构建了500PB的数据湖,并且对Hudi做了很多增强,比如SparkSQLDML和Flink的集成。近期,字节跳动推荐部分享的基于Hudi的数据湖练习表已经超过400PB,总存储量超过1EB,日增幅在PB级别。经过几年的发展,国内外多家公司都采用了胡迪。比如公有云的华为云、阿里云、腾讯云、AWS都集成了Hudi。阿里云也基于Hudi构建了Lakehouse。字节跳动的整个数仓体系也迁移到了基于Hudi的湖中,之后会有相应的文章分享他们基于Flink+Hudi的数据湖提升PB数据量的日常实践。同时,百度、快手等各大互联网公司都在使用。同时,我们了解到银行金融业还有工商银行、农业银行、百度金融、百信银行等。游戏领域包括三七互娱、米哈游、4399,可见胡地在各行各业的应用范围非常广泛。Hudi的定位是一套完整的数据湖平台。顶层可以为用户编写各种SQL。Hudi提供各种功能作为平台。下层是基于SQL和编程API,下一层是Hudi的核心,包括索引、并发控制、表服务。后续社区会基于LakeCache构建缓存。文件格式是使用的openParquet、ORC、HFile存储格式。整个数据湖可以建立在各种云上。后面会介绍Hudi的按键设计,对我们理解Hudi很有帮助。首先是文件格式。它的底层是基于Fileslice的设计,翻译过来就是文件切片,包括基本文件和增量日志文件。基础文件是Parquet或ORC文件,增量文件是日志文件。一些块在Hudi中编码用于写入日志文件。可以将一批更新编码成数据块并写入文件。基本文件是可插拔的,可以基于Parquet。最新的9.0版本已经支持ORC。同样基于HFile,HFile可以作为元数据表使用。Log文件中保存了一系列各式各样的数据块,有点类似于数据库的重做日志,通过重做日志可以查到各个数据版本。基础文件和日志文件被压缩合并,形成一个新的基础文件。Hudi提供了同步和异步两种方式,为用户提供了非常灵活的选择。例如,他们可以选择同步Compaction。如果对延迟不敏感,就没有必要再启动一个额外的异步作业来做Compaction,或者有些用户想保证写入链接的延迟可以异步compact,不影响主链接。Hudi基于FileGrouponFileSlice的概念。FileGroup会包含不同的FileSlices,而FileSlices构成不同的版本。Hudi提供了保留元数据数量的机制,保证元数据的大小可控。对于数据的更新和写入,尽量使用append。比如你之前写了一个Log文件,更新的时候会继续尝试往Log文件里写。它对HDFS等支持附加语义的存储非常友好,但很多云对象存储不支持。支持Append语义,即数据写入后不可更改,只能新写入Log文件。对于每个文件组,即不同的文件组是相互隔离的。可以针对不同的文件组实现不同的逻辑。用户可以自定义算法实现,非常灵活。基于HudiFileGroup的设计可以带来很多好处。比如basefile是100M,basefile后面更新了50M的数据,就是4个FileGroups。compaction合并的成本是600M,50M只需要合并100M,4个150M的成本是600M,这是FileGroup设计的。还有四个100M的文件,也更新了。比如每次合并25M和400M,开销是1200M。可以看出采用了FileGroup的设计,merge开销减少了一半。还有一个表格格式。表格格式中的内容就是文件在Hudi中的存储方式。先定义表的根路径,然后写一些分区,和Hive的文件分区组织方式一样。还有表的模式定义和表的模式更改。一种方式是将元数据记录在文件中,或者使用外部KV来存储元数据。两者各有优缺点。Hudi基于Avro格式来表示Schema,所以Schema的Evolution能力完全等同于AvroSchema的Evolution能力,即可以增加字段,向上兼容变化。比如int与long兼容,long与int不兼容。目前社区已经有了支持FullSchemaEvolution的计划,即可以增加一个字段,删除一个字段,重命名,也就是改变一个字段。还有一个就是Hudi的索引设计。每条数据写入Hudi时,都会维护数据主键到文件组ID的映射,以便更新或删除时可以更快定位到变化的文件。右图中有一个订单表,可以根据日期写入不同的分区。下面是user表,不需要分区,因为它的数据量没有那么大,变化也不是那么频繁,所以可以用不分区的表。对于分区表和频繁变化的表,在使用Flink写入时,使用FlinkState构建的全局索引效率更高。整个索引是可插拔的,包括Bloomfilter和HBase高性能索引。在byte场景下,Bloomfilterfilter根本无法满足PB日常的索引查找,所以他们使用HBase高性能索引,用户可以根据自己的业务形态灵活选择不同的索引实现。在索引类型不同的情况下,可以以较低的成本支持延迟更新和随机更新的场景。另一种设计是并发控制。并发控制是0.8之后才引入的。Hudi提供了一种乐观锁机制来处理并发写入问题。提交时,它会检查两个更改是否冲突。如果有冲突,则写入失败。对于没有内部锁的Compaction或Clustering等表服务,Hudi有一套协调机制来避免锁竞争问题。比如做compaction,可以先在timeline上标记一个点,然后就可以和writelink完全解耦,异步做compaction。比如左边是数据摄取链路,每半小时摄取一次数据。右边是异步删除作业,同样会改表,而且很有可能和写修改冲突,导致这个链接一直失效,平台无缘无故消耗CPU。Resources,现在社区对于这种情况也有改进方案,希望能尽快检测到并发写入的冲突,早日终止,减少资源浪费。另一种设计是元数据表。因为Hudi最初是基于HDFS构建设计的,没有过多考虑云存储场景,导致云端的FileList非常慢。因此,在0.8版本中,社区引入了MetadataTable。MetadataTable本身也是一个Hudi表。内置在一个Hudi中,可以复用Hudi表等各种表服务。MetadataTable表文件会存储分区下所有的文件名和文件大小,每列的统计信息用于查询优化,社区现在做的是基于MetaTable表建立全局索引,而每条记录对应的每一个文件ID都记录在Meta表中,以减少处理Upsert时查询待更新文件的开销,这也是云迁移所必需的。4.Hudi的未来规划未来的规划,比如基于Pulsar和Hudi构建Lakehouse,是StreamNativeCEO提出的一个提议,他想基于Hudi构建Pulsar的分层存储。在Hudi社区,我们也做了一些工作。我们想将Hudi的内置工具包DeltaStreamar集成到PulsarSource中。既然有了PR,我们希望两个社区能更紧密的联系起来。一些学生正在研究Pular分层存储内核的StreamNative部分。最近几天发布了0.9.0,其中包含重要的优化和改进。首先,集成了SparkSQL,大大降低了数据分析师使用Hudi的门槛。Flink集成Hudi的方案早在Hudi0.7.0版本就有了。经过数次迭代,集成Hudi的Flink已经非常成熟,已经在字节跳动等大公司生产使用。Blink团队做的一个CDCFormat集成直接将Update和Deltete事件保存到Hudi中。还有库存数据的一次性迁移,增加批量导入能力,减少序列化和反序列化的开销。另外,现在有用户觉得Hudi存储了一些元数据字段,比如_hoodie_commit_time等元数据信息。这些信息是从数据信息中提取出来的,存在一定的存储开销。现在支持虚拟键,元数据字段将不再存储数据。是的,它带来的局限性是不能使用增量ETL,无法获取到Hudi某个时间点之后的变化数据。另外,很多小伙伴也希望Hudi支持ORC格式。Hudi最新版本支持ORC格式。同时这部分格式是可插拔的,以后可以灵活接入更多的格式。MetadataTable的写入和查询优化也已经完成。通过SparkSQL查询时,避开Filelist,直接通过MetadataTable获取整个文件列表信息。从更进一步的角度来看,社区未来的计划包括升级到DataSourceV2以进行Spark集成。现在Hudi是基于V1的,不能使用V2的性能优化。还有Catalog集成,可以通过Catalog管理表,通过SparkCatalog集成创建、删除、更新和管理表元数据。Flink模块的Blink团队有全日制学生负责,流式数据中的Watremark稍后会推送到Hudi表中。另一种是与KafkaConnectSink集成,然后通过Java客户端直接将Kafka数据写入Hudi。内核端的优化包括基于元数据表的全局记录级索引。还有字节跳动合作伙伴做的写作支持Bucket。这样做的好处是在做数据更新的时候,可以通过主键找到对应的Bucket。只需要读取Bucket对应的parquet文件的Bloomfilter,减少查找。更新开销。还有一个更智能的聚类策略。我们内部也做了这部分工作。SmarterClustering可以根据以前的负载情况动态启用集群优化。它还包括基于MetadataTable和FullSchemaEvolution和跨表事务构建二级索引。现在Hudi社区发展比较快,代码重构量很大,但都是为了更好的社区发展。Flink集成Hudi模块从0.7.0到0.9.0版本基本完全重构。如果您有兴趣,可以参与到社区中,共同建设一个更好的数据湖平台。
