当前位置: 首页 > 科技观察

字节跳动数据湖技术选型思考

时间:2023-03-15 09:20:03 科技观察

字节跳动数据整合现状2018年,我们基于Flink构建了异构数据源间的批量同步通道,主要用于在线数据库导入离线数据库。Bins,以及不同数据源之间的批量传输。2020年我们构建了基于Flink的MQ-Hive实时数据集成通道,主要用于将消息队列中的数据实时写入Hive和HDFS,在计算引擎上实现流批统一。到2021年,构建了基于Flink的实时数据湖集成通道,完成了湖仓一体的数据集成体系建设。字节跳动数据集成系统目前支持数十种不同的数据传输管道,涵盖MysqlOracle、MangoDB等在线数据库;消息队列,如KafkaRocketMQ;大数据生态系统的各种组件,例如HDFS、HIVE和ClickHouse。在字节跳动内部,数据集成体系服务于几乎所有业务线,包括大家熟悉的抖音、今日头条等应用。整个系统主要分为批集成、流集成和增量集成三种模式。批集成模式基于FlinkBatch模式,以批的形式在不同系统中传输数据,目前支持20多种不同的数据源类型。流式集成模式主要将数据从MQ导入Hive和HDFS,任务的稳定性和实时性得到了用户的广泛认可。增量模式即CDC模式,用于支持通过数据库变更日志Binlog将数据变更同步到外部组件的数据库。该模式目前支持5个数据源。数据源虽然不多,但是任务量非常大,其中包含了很多核心环节,比如各个业务条线的计费、结算等,对数据的准确性要求非常高。CDC链路中的整体链路比较长。首先,第一次导入是批量导入。我们通过FlinkBatch方式直接连接Mysql库拉取全量数据写入Hive,增量Binlog数据通过流式任务导入HDFS。由于Hive不支持update操作,所以我们还是采用基于Spark的批处理环节,将前一天的Hive表和新增的Binlog通过T-1增量合并产生当天的Hive表。随着业务的快速发展,这个环节暴露出的问题也越来越多。首先,这个基于Spark的离线链接是非常消耗资源的。每次产生新数据时,都涉及全量数据shuffle和全量数据转储,消耗大量存储和计算资源。同时,随着字节跳动业务的快速发展,对近实时分析的需求也越来越多。最后,整个链接过程太长,涉及Spark和Flink两种计算引擎,三种不同的任务类型。用户成本和学习成本都比较高,带来大量的运维成本。为了解决这些问题,我们希望对增量模式进行一次完整的架构升级,将增量模式融入到流式集成中,从而摆脱对Spark的依赖,实现计算引擎层面的统一。改造完成后,基于Flink的数据集成引擎可以同时支持批、流、增量三种模式,几乎覆盖所有的数据集成场景。同时,在增量模式下,提供与流式通道相当的数据延迟,为用户提供近乎实时的分析能力。在实现这些目标的同时,还可以进一步降低计算成本,提高效率。经过一番摸索,我们注意到了新兴的数据湖技术。关于数据湖技术选型的思考我们关注的重点是Apache软件基金会旗下的两个开源数据湖框架Iceberg和Hudi。Iceberg和Hudi数据湖框架都很优秀。但是这两个项目是为了解决不同的问题而创建的,因此对功能的关注点也不同。Iceberg:将核心抽象连接到新计算引擎的成本相对较低,并提供高级查询优化功能和完整的模式更改。Hudi:更专注于高效的Upsert和近实时更新,提供MergeOnRead文件格式,以及增量查询功能,方便构建增量ETL管道。经过一番比较,这两个框架各有优缺点,距离我们想象中的数据湖最终形态还有一定的距离。因此,我们的核心问题集中在以下两个问题:哪个框架更能支撑我们CDC数据处理的核心诉求?哪个框架可以更快速地补充另一个框架的功能,从而成长为一个通用的、成熟的数据湖框架?经过多次内部讨论,我们认为:Hudi在处理CDC数据方面更加成熟,社区迭代非常快,尤其是过去一年,完成了很多重要的功能,与Flink的集成也更加成熟,并且最后我们选择了Hudi作为我们的数据湖基地。01-索引系统我们选择Hudi,最重要的是Hudi的索引系统。这张图片是有索引和没有索引的比较。在写入CDC数据的过程中,为了让新加入的Update数据作用于底层表,我们需要知道这条数据是否出现过,出现过什么地方,以便将数据写入到正确的地方。合并时,我们可以只合并单个文件,而不必管理全局数据。如果没有索引,则合并操作只能通过合并全局数据来完成,导致全局shuffle。在图中的例子中,不加索引的合并成本是加索引的两倍,如果底表的数据量增加,性能差距会呈指数级增长。因此,在字节跳动的业务数据层面下,索引带来的性能收益是非常巨大的。Hudi提供了多种索引来适应不同的场景。每个指标都有不同的优点和缺点。索引的选择需要根据具体的数据分布来选择,从而达到写入和查询的最优解。以下是不同场景的两个示例。日志去重场景在日志去重场景中,数据通常有一个create_time的时间戳,底层表的分布也是按照这个时间戳进行分区的。最近几小时或几天的数据会更频繁地更新,但较旧的数据不会有太大变化。冷热分区场景比较适合Bloom索引、带TTL的State索引、hash索引。CDC场景的第二个例子是数据库导出的例子,即CDC场景。这种场景下,更新的数据会无规律地随机分布,底表的数据量会比较大,而新增的数据量通常会比底表小。在这种场景下,我们可以使用哈希索引、状态索引和Hbase索引来实现高效的全局索引。这两个例子说明,在不同的场景下,索引的选择也会决定整个表的读写性能。Hudi提供了多种开箱即用的索引,已经覆盖了绝大部分场景,用户成本非常低。02-MergeOnReadtableformat除了索引系统,Hudi的MergeOnReadtableformat也是我们看重的核心功能之一。这种表格式使得实时写入和近实时查询成为可能。在大数据系统的构建中,写入引擎和查询引擎之间存在天然的冲突:写入引擎更倾向于写入小文件,以行存储的数据格式写入,尽量避免数据过多在写作过程中。计算负担,最好自己来写一个。查询引擎更倾向于读取大文件并以列存储文件格式存储数据,例如parquet和orc。数据严格按照一定的规则分布,比如按照公共字段排序,这样查询的时候就可以使用,跳过扫描无用的数据,减少计算开销。为了在这种自然冲突下找到最好的权衡,Hudi支持MergeOnRead的文件格式。MOR格式包含两个文件:一个是基于行的Avro格式的日志文件,另一个是基于列的格式的基础文件,包括Parquet或ORC。日志文件通常很小并且包含新添加的更新数据。基础文件很大,包含所有历史数据。写入引擎可以将更新的数据以低延迟写入日志文件。查询引擎在读取时将日志文件与基础文件合并,从而读取到最新的视图;compaction任务周期性触发basefile和logfile的merge,避免logfile不断扩容。在这种机制下,MergeOnRead文件格式实现了实时写入和近实时查询。03-增量计算指标体系和MergeOnRead格式为实时数据湖打下了非常坚实的基础。增量计算是Hudi在此基础上的另一个突出特点:增量计算赋予了Hudi类似消息队列的能力。用户可以通过类似于offset的时间戳在Hudi的时间线上拉取一段时间的新数据。在一些数据延迟容忍度在分钟级别的场景,基于Hudi,可以统一Lambda架构,同时服务于实时场景和离线场景,在存储上实现流批一体化。结语在选择了基于Hudi的数据湖框架后,我们根据字节跳动的内部场景打造了定制化的落地方案。我们的目标是通过Hudi支持所有与Update的数据链接。