本次分享主要针对新版ApacheHudi0.11.0的新特性进行深度解读。主要介绍四个方面:多级索引SparkSQL新功能Flink集成改进OthersFunctionandImprovement01多级索引首先给大家分享的是多级索引,接下来从三个方面来介绍.第一个是我们为什么要引入多级索引multimodel索引,第二个是多级索引的设计和实践,最后我们会介绍如何使用多级索引来大幅提升读写性能.1、为什么要引入多模态索引(Multi-ModalIndex)在介绍多级索引之前,我们先来看看什么是索引?索引是数据库系统中常用的查询加速技术。通过建立索引,可以利用生成的元数据快速定位所需数据的位置,可以减少甚至避免从文件系统中扫描或读取不必要的数据,减少IO开销,大大提高查询效率。我们可以类比图书馆和教科书中的索引,它们使用预先生成的元数据来快速找到你想要查找的信息。事实上,ApacheHudi的湖仓一体化架构已经提供了独有的索引支持。在这里我们可以看到一个例子。其中,我们使用的一个索引可以快速定位到我们需要更新或删除的记录所在的文件组。如下图所示:如果没有索引,对于所有更新和删除的记录,我们都需要和所有的文件合并,代价会很大。如果使用索引,读写开销会大大降低,可以提高查询该位置的效率。在Hudi中,我们默认启用了基于布隆过滤器的索引。这些布隆过滤器存储在数据文件的页脚中。作为一个单独的存在,可以在索引的过程中使用。那么为什么我们在Hudi中仍然引用多级索引呢?其实索引的主要目的就是为了提高刚刚提到的数据查询的速度,所以需要存储元数据。对于TB级和EB级表中的元数据,其量级会非常大。通常的做法是将它们存储在单独的数据块或数据文件中。在实测过程中,我们遇到了读写瓶颈。同时,在维护元数据的过程中,需要与数据表实时同步。这种管理是很复杂的,因为要保证交易,如果要引用Hudi里面的新索引,开发周期会很长。2.多级索引的设计与实现为了解决以上问题,我们在湖仓一体化存储架构中引入了多级索引。这是第一次在类似架构中引入统一平台、多样化和高性能的指标。我们的目标是支持所有的计算和查询引擎来提高读写性能,即使未来出现新的引擎,它们也会兼容。接下来介绍多级索引设计需要的一些需求。首先是我们需要确保可扩展的元数据。我们希望元数据是Serverless的,不需要任何计算,不需要内存支持,可以独立存在于一体化数据仓库和数据湖中。同时希望它能够独立于计算引擎和查询引擎,具有可扩展性,能够高性能地支持不同的索引类型。二是我们希望多级索引中的元数据和数据表保持实时同步,保证每次更新都是事务性的。三是保证查询速度。保证多级索引的低延迟查询。主要的查询类型包括点、范围、前缀查找等,我们来看看它是如何实现的。第一个是可扩展的元数据。我们采用了类似于现有数据库的设计,即在内部构建元数据表。对于Hudi表,我们使用Hudi的mor表来存储这些数据。mor表的优点是可以非常快速地更新和删除数据。同时Hudi表也是serverless的,不依赖任何计算和内存资源。在这张表中,我们为不同的索引创建了独立的分区。在这种情况下,不同的指标可以完成独立管理和自动管理。我们使用mor表的另一个优势是它们可以支持任意大小的索引。从mb级到gb级再到tb级。对于独立的分支,我们可以引入新的角色类型,只需要创建新的分区即可。在构造可扩展元数据时,需要对索引进行初始化。我们通过两种方式改进了初始化。一是同步。同步是写表过程中最终提交前的一个索引步骤。第二种方式是异步的。hudi首次引入异步创建索引,保证不影响并发writer。下面是异步创建索引的流程图:第二个设计原则是保证元数据表的更新是事务性的,从而保证元数据表结构中的数据与数据表实时同步.我们设计了一套事务叫多表。同时,在这个元数据表中,有自我管理的表服务,包括compaction和cleaning。他们会保证定时操作,保证这张元数据表的读性能会很好。第三点是元数据的快速查询。我们使用HFile作为MDT的数据格式。原因是columnformatParquet或row-basedAvro不适合pointedlookup;HFile格式的索引使得指向查找非常高效,只需要读取相关的数据块。我们对HFile进行了测试,发现在查询千万(10M)+个条目中的N个条目时,HFile相比Parquet和Avro有10-100倍的提升。如下图所示:3.使用多级索引大幅提升读写性能接下来介绍多级索引带来的主要读写性能提升。首先是文件列表。在云存储中,我们发现在大多数情况下,如果你列出数千个分区的大表和数百万的数据文件,就会造成读写瓶颈。这主要是由于云存储的设计。如果我们使用元数据表中的文件来分区。此数据表中的所有文件均在此分区中提供。与云软件系统相比,有2-20倍的提升。如下图所示:另一个重要的特点是DataSkippingDataSkipping技术利用列统计对需要的数据文件进行文件剪枝(filepruning)。列统计常用的列统计包括最大值和最小值、数量、大小等,DataSkipping的作用就是利用这些统计数据排除不需要读取的文件,可以大大提高查询速度。我们在多模型索引的元数据中建立了column_stats分区,该分区中的每条记录包含了Hudi表中对应文件的列统计信息。每个记录键由列名、分区名和文件名组成。通过这种排列方式,可以快速定位到需要的列统计信息。查询复杂度是根据查询列所需的列数,通常是5到10列。对于大而宽的表,这可以大大提高这种效果。在实际测试中,Hudi大宽表在云端的“定向”查询速度提升了10x-30x。大大减少了对无关数据文件的扫描和读取,提高了I/O效率。如下图所示:我们还测试了Upsert性能。我们在元数据表中引入了一个bloom_filter分区来代替footer中的bloomfilter,可以大大减少大表读取文件的时间。每条记录包含单个数据文件(分区+文件名)的布隆过滤器,支持前缀查找。经实测,在100k文件的Hudi表中,与读取footer相比,从MDT读取bloomfilter的速度要快3倍。t6.c是基于多级索引的,未来还有很多工作要做。目前的工作是开发记录级索引和Luncene索引。02SparkSQL的新特性接下来说说Hudi对SparkSQL的改进。1、用户可以使用非主键字段更新或删除Hudi表中的记录。SparkSQL改进了删除操作。在时间t1向mor表的牛表中插入三条数据a、b、c。这会在mor表中生成一个base文件和一个log文件(下图简化了示意图)。cow表中只会产生base文件。在t2时刻,同时删除mor表和cow表中b的数据。mor表操作是删除日志文件b的block,即t2时刻的数据。cow表中的操作是将基础文件b复制一份,保存在内存中。删除b数据后,会形成绿框中新版本的数据文件。如下图所示:2.SQL支持时间旅行查询。为什么我们需要实施时间旅行?从API层面来说,如果我们要写一个查询,我们需要设置不同的dfs,构造不同的操作,然后查询这个action。但是引用了穿越时空的语句后,一是可以直接在sparksql中使用,二是sql语句更容易解释这样的行为和动作。现在通过语法的时间戳支持时间旅行查询,但仅在Spark3.2+中。语法如下:select*fromhudi_tbltimestampasof'20210728141108100'①SQLTravel-场景一:查询多版本数据如下图,我们在10点10分提交了insert和update语句,想查看10:05的数据,可以通过下面的sql实现。select*fromtest_huditimestampasof20220512100510000(10:05)select*fromtest_huditimestampasof20220512101030000(10:10)②出行-场景2:数据恢复修复Createhuditable:createtabletest_hudi...insertdata:insertintotest_hudi...(每插入一条数据都会生成一个版本)查询数据:select*fromtest_hudi误删数据:deletefromtest_hudiwhereid=2查询数据:select*fromtest_hudi(删除id=的数据后2、只有两个版本)恢复数据:insertintotest_hudi_tableselect*fromtest_huditimestampasof20220511165343733(数据存在则更新,数据不存在则插入)查询数据:select*fromtest_hudi(id=2的数据有beenrestored)注意:如果表被truncate清空,这种时间戳方法查询恢复将不起作用。3.新增CALL命令,支持调用Hudi表的更多操作。CallCommand的背景之一是sparksql除了ddl、dql、dml之外还运行。我们要解决这三个操作之外的一些新函数的操作。那么在引入CallCommand之前就没有办法操作了。我们对比了传统数据库中的存储过程,同样在spark中实现了一个commandaction,然后相应的实现了一个procedurefunction。首先是在hudi端为call命令生成通用语法,不依赖spark版本,适用于所有spark版本。然后生成一个HoodieProcedure类,使用CallProcedureHoodieCommand类调用动作。CallCommand命令在设计上主要有四个功能。一种是支持归档、提交、回滚和创建还原点的快照操作。其次,可以进行原始数据管理。三是对运维表进行数据导入导出、Boostrap、修复表、升级/降级等操作。第四种是优化表的动作,比如Compaction、Clustering、Clean等,CallCommand的参数有3种。一是您可以使用不定式参数(键值对)作为其输入参数。二是可以根据参数的位置输入参数。第三个是混合参数。下面是传递参数的具体语法:接下来是对CALL命令的一些功能介绍。①CALL命令-快照管理查询快照对应命令:callshow_commits_metadata(table=>'test_hudi_table',limit=>10)回滚快照:callrollback_to_instant('test_hudi_table',20220511224632307')createsavepoint:create_savepointsrestoresavepoint:rollback_savepointsdeletesavepoints:delete_savepoints②CALLCommand-Clustering可以设置Clustering类型:sethoodie.layout.optimize.strategy=linear/z-order/hilbert常用命令:callrun_clustering(table=>'test_hudi_table',order=>'ts')callshow_clustering(table=>'test_hudi_table')通过这些聚类动作,查询时性能可以提升10-20倍。③CALLCommand-Compact(smallfilemerging)(目前只支持mortables)Datafile和Deltalogfilemerging会重新生成一个新的文件。command:callrun_compaction(table=>'test_hudi_table',op=>instant03Flink集成改进最后介绍一下Flink集成改进的内容,主要有以下几点:1.0.11.0中,Flink1.13.x和1.14都有.x.2.支持复杂数据类型3.基于DFS的FlinkHoodieCatalog。绿色是目前已经执行的操作,红色是要执行的操作。可以通过API或者FlinkSql来实现。FlinkHoodieCatalog在三个方面有更好的效果:第一个方面是元数据的管理,FlinkHoodieCatalog的框架可以更好的管理HoodieCatalog;第二个方面是基于这个框架,可以去其他平台De-docking,更方便对接和使用;第三个方面是元数据,可以在数据血缘关系方面构造一些功能。4、BucketIndex为什么要整合改进BucketIndex?这是字节的同学贡献的一个功能。在他们的生产场景中,当写入34TB数据,写入5000亿条记录时,BloomFilterIndex通过Recordkey查找FileID的性能会快速下降。为了解决BloomFilterIndex的误报,他们引入了BucketIndex。通过key的hash值定位文件组,提高了实时导入的性能。如下图所示:从Flink输入5条数据,然后通过一定的hash策略关联混合的BucketIndex,通过获取FileGroupId写入文件。使用Bucket分布优化Bucket分布优化主要包括:BucketPruning、BucketAggregate、BucketJoin等。如下图所示:Bucketlimit。目前BucketIndex中的Bucket个数需要在建表时根据预估的数据量提前确定,建表后无法更改。Bucket参数:hoodie.index.type值:BUCKET参数:hoodie.bucket.index.num.buckets值:48(256MB)建议单个bucket的大小控制在3GB左右。Bucket的后续改进。Hashmap在扩容过程中,通过桶的数量倍增,实现了轻量级的动态扩容。04其他功能和增强最后,让我们谈谈其他功能和增强。1.SparkDataSource查询优化当我们使用mor表进行快照查询时,会读取日志文件,然后与基础文件合并。在以前的版本中,当你进行快照查询时,会读出整个日志文件记录。我们对这个版本进行了优化,使用内置的标准Payload来读取。例如:OverwriteWithLatestAvroPayload。我们会对此进行优化,只读出必要的列,这样会大大减少压缩解码带来的内存和CPU消耗。事实上,对于具有数千列的非常宽的表,效果会非常详细。2.Schema的演化在这个版本中,我们增加了Spark3.1和Spark3.2的schema功能的演化。如果启用sethoodie.schema.on.read.enable=true,我们可以对表列和表进行一系列操作。列更改(增加、删除、重命名、修改位置、修改属性)、表更改(重命名、修改属性)等。3.保存点和恢复保存点和恢复可以使用调用命令来完成这些操作。新版本引入了mor表,使用HudiCLI设置保存点并执行recovery或者调用命令手动设置保存点。保存点之后的数据将被删除。4.Pulsarwritecommit回调Hudiwritecommit支持Pulsar下游操作*主要配置HoodieWriteCommitCatalog同步GoogleBigQuery支持COW表查询*DataHub支持schema同步GlueDataCatalog通过AWSSDK原生同步
