当前位置: 首页 > 科技观察

Flink+Iceberg+对象存储,构建数据湖解决方案

时间:2023-03-13 08:05:49 科技观察

本文编译自《Iceberg 和对象存储构建数据湖方案》,戴尔科技集团高级软件研发经理孙伟4月17日FlinkMeetup上海站分享。内容文章的正文是:1.Datalake和Iceberg简介2.对象存储支持Icebergdatalake3.演示方案4.存储优化的一些思考1.Datalake和Iceberg简介1.Datalake生态如图以上,对于一个成熟的数据湖生态:首先我们认为它应该具备海量存储的能力,常见的有对象存储、公有云存储和HDFS;除此之外,还需要支持丰富的数据类型,包括非结构化图像视频、半结构化CSV、XML、Log、结构化数据库表等;此外,还需要高效、统一的元数据管理,以便计算引擎可以轻松索引各种类型的数据进行分析。最后,我们需要支持丰富的计算引擎,包括Flink、Spark、Hive、Presto等,方便与企业现有的一些应用架构对接。2.结构化数据在数据湖上的应用场景上图展示了一个典型的数据湖上的应用场景。数据源上可能有多种数据,不同的数据源和不同的格式。比如交易数据、日志、埋点信息、物联网等,这些数据经过一些流,然后进入计算平台。这时候就需要一个结构化的解决方案,将数据组织在一个存储平台上,然后为后端数据应用提供实时或定时的查询。这样的数据库解决方案需要具备哪些特点?首先可以看到,数据源的类型非常多,所以需要支持组织相对丰富的数据模式;其次,在注入过程中需要支持实时数据查询,所以需要ACID保证,保证不会读取到一些还没有写入的中间状态的Dirty数据;最后,例如日志,可能临时需要更改格式或添加列。在这种情况下,要避免像传统的数据仓库那样,所有的数据都可能需要重新检索和写入,重新注入存储;相反,需要一个轻量级的解决方案来满足要求。Iceberg数据库的定位就是实现这样一个功能,上面对接计算平台,下面对接存储平台。3.数据湖上结构化数据的典型解决方案对于结构化数据组织,典型的解决方案是使用数据库的传统组织方式。如上图,上面有一个命名空间,数据库表的隔离;中间有多个表,可以提供各种数据模式的存储;数据将放在底部,表格需要提供ACID特性,同时也支持本地模式的演化。4.Iceberg表数据组织结构快照Metadata:表Schema、Partition、Partitionspec、ManifestList路径、当前快照等ManifestList:ManifestFile路径及其Partition,数据文件统计。ManifestFile:数据文件路径和每列数据的上下边界。数据文件:实际的表内容数据,以Parque、ORC、Avro等格式组织。让我们仔细看看Iceberg是如何组织数据的。如上图:可以看到右边的数据文件是从数据文件开始的,数据文件存储的是表格内容数据,一般支持Parquet、ORC、Avro等格式;上半部分是ManifestFile,里面记录了最下面数据文件的路径和每一列的数据。上下边界方便过滤查询文件;上半部分是ManifestList,在最下面链接多个ManifestFile,记录ManifestFile对应的分区范围信息,也方便后续过滤查询;ManifestList其实已经表达了快照信息,其中包含了当前数据库表的所有数据链接,也是Iceberg支持ACID特性的关键保证。有了快照,在读取数据时,只能读取快照可以引用的数据,还在写入的数据不会被快照引用,也不会读取到脏数据。多个快照共享之前的数据文件,通过共享这些ManifestFiles来共享之前的数据。再往上是snapshotmetadata,它记录了当前或历史的表Scheme变化、分区配置、所有snapshotManifestFile路径,以及哪一个是当前snapshot。同时,Iceberg提供了命名空间和表的抽象,用于完整的数据组织和管理。5.Iceberg写入流程以上是Iceberg数据写入的流程图,这里以计算引擎Flink为例。首先,DataWorkers会从元数据中读取数据进行分析,然后将记录交给Iceberg进行存储;和普通数据库一样,Iceberg也会有预定义的分区,这些记录会被写入不同的分区。形成一些新文件;Flink有Checkpoint机制。文件到达后,Flink会完成这批文件的写入,然后生成这批文件的列表,交给CommitWorker;CommitWorker会读取当前snapshot的信息,然后和本次生成的文件列表进行合并,生成新的ManifestList和后续元数据表文件的信息,然后提交。成功后会形成一个新的快照。6.Iceberg查询流程以上是Iceberg数据查询流程。首先,FlinkTablescanworker进行一次扫描。扫描时,可以像树一样从根部开始,找到当前快照或者用户指定的一个历史快照,然后从快照中取出当前快照的ManifestList文件,并根据其中的一些进行保存信息,可以筛选出符合查询条件的ManifestFile;然后再根据ManifestFile中记录的信息去筛选出需要的DataFiles。文件取出后,交给记录器读取器工作人员。它从文件中读取满足条件的Recode,然后返回给上层调用。这里可以看到一个特点,就是整个数据查询过程中没有使用List。这是因为Iceberg已经完整的记录下来了。整个文件的树状结构,不需要List,直接单路径指向,所以在查询性能上没有耗时的List操作,对对象存储比较友好,因为List上的对象存储是一个比较耗资源的操作。7.IcebergCatalog功能概述Iceberg为Catalog提供了一个很好的抽象来连接数据存储和元数据管理。任何存储,只要它实现了Iceberg的目录抽象,就有机会与Iceberg连接以组织对上述数据湖解决方案的访问。如上图所示,Catalog主要提供了几个抽象。可以为Iceberg定义一系列角色文件;其FileIO可以自定义,包括读写和删除;它的命名空间和表操作(也称为元数据操作)也可以自定义;包括表格阅读/扫描和表格提交都可以使用目录进行自定义。这样可以提供灵活的操作空间,方便各种底层存储的对接。二、对象存储支持Iceberg数据湖1、目前的IcebergCatalog实现社区现有的IcebergCatalog实现可以分为两部分,一是数据IO部分,二是元数据管理部分。如上图所示,其实是缺少私有对象存储的Catalog实现。S3A理论上可以连接对象存储,但它使用的是文件系统语义,而不是自然的对象存储语义。模拟这些文件操作会有额外的开销,而我们想要实现的是将所有的数据和元数据管理交给一个对象存储,而不是单独设计。2、对象存储和HDFS的比较这里有个问题,有HDFS为什么还要用对象存储?如下图,我们从各个角度对比对象存储和HDFS。综上所述,我们认为对象存储在集群扩展性、小文件友好性、多站点部署、低存储开销等方面更具优势;HDFS的优势在于它提供了额外的上传和原子重命名,这两个优势正是Iceberg所需要的。下面简单介绍一下这两种存储各自的优势。1)相比之下:集群可扩展性HDFS架构采用单个NameNode存储所有元数据,这决定了其单节点能力有限,因此在元数据方面不具备水平扩展能力。对象存储一般采用哈希的方式将元数据分离成块,将块交给不同节点上的服务进行管理。自然元数据的上限会高一些,极端情况下甚至可以进行rehash。将这个块划分成更细的块,交给更多的节点来管理元数据,从而实现可扩展性。2)比较:小文件友好性如今,在大数据应用中,小文件越来越普遍,逐渐成为一个痛点。HDFS基于架构限制,小文件存储受限于NameNode内存等资源。虽然HDFS提供了Archive的方法来合并小文件,减轻NameNode的压力,但这需要额外的复杂度,并且不是原生的。同样,小文件的TPS也受限于NameNode的处理能力,因为它只有一个NameNode。对象存储的元数据是分布式存储和管理,可以将流量很好的分布到各个Node上,使得单个节点可以存储大量的小文件。目前很多对象存储都提供多媒体和分层加速,可以提高小文件的性能。3)对比:多站点部署对象存储支持多站点部署全局命名空间支持丰富的规则配置对象存储多站点部署能力适用于两站点三中心多活架构,而HDFS没有原生多站点部署能力。虽然有些商业版本在HDFS上增加了多站点负责数据的能力,但由于其两个系统可能是独立的,无法支持真正的全局命名空间下多活的能力。4)比较:存储开销低对于存储系统来说,为了适应随机的硬件故障,一般都有复制机制来保护数据。一个常见的例子是三副本,将数据存储三份,然后分别保存在三个节点上。存储开销增加了三倍,但可以容忍两个副本同时失效,保证数据不会丢失。另一种是纠删码,俗称EC。以10+2为例,它将数据切割成10个数据块,然后用算法计算出两个码块,一共12个块。然后分发到四个节点,存储开销是1.2倍。它还可以同时容忍两个块失败。这样的话,所有的数据都可以用剩下的10个block来计算,减少了存储开销,达到了容错级别。HDFS默认使用三副本机制,HDFS新版本已经支持EC能力。经过研究,EC是基于文件的,所以对于小文件有天然的劣势。因为如果小文件的大小小于block要求的大小,它的开销会比原来的开销大,因为这里不能保存两个代码块。极端情况下,如果它的大小等于单个代码块的大小,就已经相当于三份了。同时,一旦HDFS是EC,就不能再支持append、hflush、hsync等操作,这将极大地影响可以使用EC的场景。对象存储原生支持EC。对于小文件,它根据预先配置的策略,在内部将小文件合并成一个大块用于EC,以确保数据开销始终不变。3、对象存储的挑战:数据的额外上传在S3协议中,上传时需要提供对象的大小。以S3标准为例,当对象存储接入Iceberg时,S3标准对象存储不支持额外上传数据的接口,协议要求上传文件时提供文件大小。所以在这种情况下,对于这种流式的FileIO传入并不是很友好。1)方案一:S3Catalog数据附加上传-小文件缓存本地/内存对于一些小文件,在流进来的时候写入本地缓存/内存,上传到对象存储。2)方案二:附加上传S3Catalog数据-MPUmulti-partuploadforlargefiles对于大文件,会使用S3标准定义的MPUmulti-partupload。一般分为几个步骤:第一步是创建一个初始化的MPU,得到一个UploadID,然后给每个Segment分配一个UploadID和一个编号,这些Segment可以并行上传;上传完成后,还需要一个完整的操作,相当于一个通知系统。它将所有基于同一个UploadID的数字从小到大排列,形成一个大文件;当该机制应用于数据追加上传场景时,常规的实现方式是写一个文件,将该文件缓存到本地。当达到所需的块大小时,可以初始化MPU并可以上传其中的一个块。后续每个区块都执行相同的操作,直到上传完最后一个区块,最后调用完成操作完成上传。MPU有优点也有缺点:缺点是MPU的分片数量有上限,S3标准可能只有10000个分片。如果要支持大文件,block不能太小,所以对于比block小的文件,还是要用之前的方式缓存上传;MPU的优势在于可以并行上传。假设是异步上传,文件到达缓存后,可以继续缓存下一个,不用等上一个块上传成功,再开始上传。当前面的注入速度足够快的时候,后端的异步提交就变成了并行操作。使用这种机制,它可以提供比单流上传更快的上传速度。4、对象存储的挑战:原子提交下一个问题是对象存储的原子提交问题。前面说了,在数据注入的过程中,最终的提交其实是分为几个步骤的,是一个线性事务。首先,它必须读取当前的快照版本,然后合并这次的文件列表,然后提交它的新版本。这个操作类似于我们编程中常见的“i=i+1”。它不是原子操作,对象存储标准中没有提供这种能力。上图是并发提交元信息的场景。这里CommitWorker1拿到v006版本,然后合并自己的文件,成功提交v007。这时又有一个CommitWorker2,也拿到了v006,然后合并,同样提供了v007。这个时候我们就需要一个机制告诉它v007有冲突不能上传,然后让它自己去Retry。稍后重试拉出新的v007合并并提交到v008。这是一个典型的冲突场景,这里需要一个机制,因为如果不能检测到是冲突,再次提交v007会覆盖上面的v007,导致上次提交的数据全部丢失。如上图所示,我们可以使用分布式锁机制来解决上述问题。首先,CommitWorker1获取v006,然后合并文件。在commit之前,需要获取这个锁,获取到锁之后再判断当前的快照版本。如果是v006,那么v007可以提交成功,提交成功后解锁。同理,CommitWorker2拿到v006merge后,一开始无法拿到锁,需要CommitWorker1释放锁后才能拿到。拿到锁再查看,会发现当前版本已经是v007了,和自己的v007有冲突,所以这个操作肯定会失败,然后会执行Retry。这是通过锁来解决并发提交的问题。5、DellEMCECS的附加数据上传是基于S3标准对象存储,Iceberg问题的解决方案存在一些问题,比如性能损失,或者需要部署额外的锁服务。DellEMCECS也是一种对象存储。基于这个问题,有不同的答案。它在S3标准协议的基础上做了一些扩展,可以支持额外上传数据。它的附加上传与MPU的不同之处在于它没有块大小限制。blocks可以设置的小一些,上传后会在内部拼接起来,仍然是一个有效的文件。appendupload和MPU都可以在一定程度上适应不同的场景。MPU有上传加速能力,追加上传的性能不够快,没有MPU初始化和合并操作,所以两者在性能上可以用于不同的场景。6、DellEMCECS在并发提交下的解决方案ECS对象存储也提供了一个If-Match语义,在微软的云存储和谷歌的云存储上都有这样的接口能力。If-Match的意思是CommitWorker1在提交v006的时候,也拿到了文件的eTag。提交时会带上eTag。系统需要判断被覆盖文件的eTag是否与当前文件真实的eTag相同。如果相同则允许覆盖操作,则v007可以提交成功;另一种情况是CommitWorker2也拿到了v006的eTag,上传的时候发现拿到的eTag和当前系统的文件不一样,会返回失败,然后触发Retry。这种实现与锁机制的效果是一样的,不需要在外部重新部署锁服务来保证原子提交问题。7.S3Catalog-数据统一存储回顾,上面我们解决了文件IO中上传数据IO的问题,解决了元数据表的原子提交问题。解决了这些问题后,就可以将数据和元数据的管理全部交给对象存储,不需要额外部署元数据服务,真正实现了数据统一存储的理念。3.演示方案如上所示,演示方案使用的是Pravega,可以简单理解为Kafka的替代品,但是针对性能进行了优化。在这个例子中,我们会向Pravega流中注入数据,然后Flink会从Pravega中读取数据进行解析,然后存储到Iceberg组织中。Iceberg使用ECSCatalog直接连接对象存储,无需任何其他部署,最后使用Flink读取数据。4.存储优化的一些思考上图是目前Iceberg支持的数据组织结构。可以看到它在storage中直接存放了Parquet文件。我们的想法是,如果lake和metadatalake其实是同一个lake,那么生成的Parquet文件有没有可能和源文件有很大的数据冗余,有没有可能减少冗余信息的存储。例如,在最极端的情况下,如果Iceberg中记录了源文件的信息,Parquet数据文件将不会被保存。查询时,通过自定义FileIO,让它在内存中根据原始文件实时生成类似Parquet的格式,提交给上层应用查询,达到同样的效果。但是这种方式仅限于存储成本非常高,而查询性能不高的情况。能做到这一点也是基于Iceberg很好的抽象,因为它的文件元数据和FileIO都是抽象的,源文件可以被反汇编成一个Parquet文件。进一步思考,是否优化查询性能,同时节省存储空间。比如预计算,取出源文件一些常用的列,然后将统计信息放入Iceberg,读取时使用源文件和云计算文件,可以快速查询信息,同时time保存不常用的数据Columnstorage。这是一个比较初步的想法。如果能够实现,Iceberg不仅可以对结构化的Parquet文件格式进行索引,还可以对一些半结构化、结构化数据进行索引,通过临时计算解决上层的查询任务,成为一个更加完备的DataCatalog。原文链接:http://click.aliyun.com/m/1000283887/