当前位置: 首页 > 科技观察

为突破主流数仓的缺陷,字节跳动基于Doris的胡仓分析探索实践

时间:2023-03-13 13:43:14 科技观察

Doris是一款MPP架构的分析型数据库,主要面向多维分析、数据报表、用户画像分析等场景.自带分析引擎和存储引擎,支持向量化执行引擎,不依赖其他组件,兼容MySQL协议。一、Doris简介ApacheDoris具有以下特点:1)良好的架构设计,支持高并发低延迟的查询服务,支持高吞吐量的交互分析。多个FE可以对外提供服务。当并发量增加时,FE和BE的线性扩展可以支持高并发的查询请求。2)支持批量数据加载和流式数据加载,支持数据更新。支持Update/Delete语法,唯一/聚合数据模型,支持数据动态更新,聚合指标实时更新。3)提供高可用性、容错处理和高度可扩展的企业级特性。FELeader异常,FEFollower秒级切换到新的Leader继续对外提供服务。4)支持聚合表和物化视图。多种数据模型,支持聚合、替换等数据模型,支持创建汇总表,支持创建物化视图。汇总表和物化视图支持动态更新,无需用户手动处理。5)兼容MySQL协议,支持直接使用MySQL客户端连接,数据应用对接非常好用。Doris由Frontend(以下简称FE)和Backend(以下简称BE)组成。FE负责接受用户请求、编译、优化、分发执行计划、元数据管理、BE节点管理等功能,BE负责执行FE下发的功能。用户数据的执行计划、存储和管理。2、数据湖格式介绍HudiHudi是下一代流式数据湖平台,提供数据湖表格式管理能力,提供事务、ACID、MVCC、数据更新删除、增量数据读取等功能。支持Spark、Flink、Presto、Trino等多种计算引擎。Hudi根据数据更新时的行为分为两种表类型:针对Hudi的两种表格式,有三种不同的查询类型:3.Doris分析Hudi数据的技术背景在数据仓库业务中,作为业务数据变化对实时性要求越来越高,T+1数仓业务逐渐向小时、分钟、甚至秒级别演进。实时数仓的应用越来越广泛,也经历了多个发展阶段。目前存在各种解决方案。1.Lambda架构Lambda将数据处理流程分为在线分析和离线分析两种不同的处理路径。两条路径相互独立,互不影响。T+1数据离线分析处理,使用Hive/Spark处理数据量大,数据不可变,数据一般存储在HDFS等系统中。如果遇到数据更新,需要覆盖整个表或者整个分区,成本比较高。实时数据在线分析处理,使用Flink/SparkStreaming处理流式数据,秒级或分钟级流式数据分析处理,数据存储在Kafka或周期(分钟级)存储在HDFS.这套方案存在以下缺点:同一套指标可能需要开发在线分析和离线分析两套代码,维护复杂。数据应用在查询指标时可能需要同时查询离线数据和在线数据,开发复杂。同时部署两套批处理和流计算引擎,运维复杂。数据更新需要覆盖整个表或分区,代价高昂。2.Kappa架构随着在线分析服务越来越多,Lambda架构的劣势也越来越明显。添加一个指标需要线上线下分别开发,维护难度大。线下指标可能与线上指标不一致,部署复杂。很多的。于是Kappa架构应运而生。Kappa架构使用一套架构来处理在线数据和离线数据,使用同一套引擎同时处理在线和离线数据,数据存储在消息队列上。Kappa架构也有一定的局限性:流式计算引擎批处理能力较弱,在处理大量数据时性能较弱。数据存储采用消息队列,对数据存储的有效性有限制,历史数据不可追溯。数据时序可能乱序,对于一些对时序要求严格的应用可能会导致数据错误。数据应用需要从消息队列中取数据,需要开发适配接口,开发复杂。3、基于数据湖的实时数据仓库针对Lambda架构和Kappa架构的缺陷,业界开发了基于数据湖的Iceberg、Hudi、DeltaLake等数据湖技术,使数据仓库能够支持ACID、Update/Delete、数据TimeTravel、SchemaEvolution等特性,数据仓库的时效性从小时级提升到分钟级,数据更新也支持局部更新,大大提升了数据的性能更新。具有流计算的实时性和批计算的吞吐量,支持近实时场景。上述方案中,数据湖的应用最为广泛,但数据湖模型无法支持更高的秒级实时性,也无法直接对外提供数据服务。需要构建其他数据服务组件,系统相对复杂。基于这样的背景,一些商家开始使用Doris来承接。业务数据分析师需要对Doris和Hudi中的数据进行联合分析。另外,在Doris对外提供数据服务时,需要能够查询到Doris中的数据,并加快查询速度。线下业务的数据湖数据,所以我们开发了Doris访问数据湖Hudi数据的特性。4.Doris分析Hudi数据的设计原理基于以上背景,我们设计了ApacheDoris来查询数据湖格式的Hudi数据。由于Hudi生态是java语言,而ApacheDoris的执行节点BE是C++环境,C++无法直接调用HudijavaSDK,对此我们有四种解决方案。1)实现HudiC++客户端,在BE中直接调用HudiC++客户端读写Hudi表。该方案需要完整实现一套HudiC++客户端,开发周期长。后期Hudi行为变化需要同步修改HudiC++客户端,维护难度大。2)BE通过thrift协议向Broker发送读写请求,Broker调用Hudijava客户端读取Hudi表。本方案需要在Broker中增加读写Hudi数据的功能。目前Broker只是定位为fs的运行接口。Hudi的引入打破了Broker的定位。二是BE和Broker之间需要传输数据,性能低。3)使用JNI在BE中创建JVM,加载Hudijava客户端读写Hudi表。该方案需要在BE进程中维护JVM,JVM调用Hudijava客户端对Hudi进行读写。读写逻辑使用Hudi社区java实现,可以与社区保持同步;同时,数据在同一个进程中处理,具有高性能。但是在BE中需要维护一个JVM,管理比较复杂。4)使用BEarrowparquetc++api读取hudiparquetbase文件,huditable中的delta文件暂不处理。该方案可以通过BE直接读取hudi表的parquet文件,性能最高。但是目前不支持basefile和deltafile的合并读,所以只支持COW表的SnapshotQueries和MOR表的ReadOptimizedQueries,不支持IncrementalQueries。综上所述,我们选择了方案4。第一阶段实现了COW表的SnapshotQueries和MOR表的ReadOptimizedQueries,后来和Hudi社区合作开发了C++接口,用于联合读取基本文件和增量文件。五、Hudi数据Doris分析技术实现在Doris中查询和分析Hudi表象的步骤非常简单。1、创建Hudi外部表时,指定引擎为Hudi,同时指定Hudi外部的相关信息,如hivemetastoreuri,hivemetastore中的数据库和表名等。table只是在Doris元数据中添加一个表,没有任何数据移动。创建表时,支持指定全部或部分hudischema,也支持不指定schema创建hudi表。指定schema时,必须与hiveMetaStore中hudi表的列名和类型保持一致。示例:明文CREATETABLEexample_db.t_hudiENGINE=HUDIPROPERTIES("hudi.database"="hudi_db","hudi.table"="hudi_table","hudi.hive.metastore.uris"="thrift://127.0.0.1:9083");创建表example_db.t_hudi(column1int,column2string)ENGINE=HUDIPROPERTIES("hudi.database"="hudi_db","hudi.table"="hudi_table","hudi.hive.metastore.uris"="thrift://127.0.0.1:9083");2.在查询Hudi外部表和查询Hudi数据表时,FE会在analazy阶段查询metadata获取Hudi外部表的hivemetastore地址,从Hivemetastore中获取hudi的schema信息和文件路径桌子。获取hudi表的数据地址。FE规划片段添加HudiScanNode。获取HudiScanNode中Hudi表对应的数据文件列表。根据从Hudi表中获取的数据文件列表生成scanRange。将HudiScan任务发送到BE节点。BE节点根据HudiScanNode指定的Hudi外部文件路径调用原生parquetreader读取数据。6.后期规划目前ApacheDoris查询Hudi表已经融入社区,目前支持COW表的SnapshotQuery和MOR表的ReadOptimizedQuery。目前还不支持MOR表的SnapshotQuery,也不支持streaming场景下的IncrementalQuery。未来还有几项工作要做,我们正在积极与社区合作:MOR表的SnapshotQuery。MOR表的实时读取需要结合Data文件和对应Delta文件的读取。BE需要支持读取Delta文件AVRO格式,需要增加avro的原生读取方法。COW/MOR表的增量查询。实时业务支持增量读。BE读取Hudi基础文件和delta文件的原生接口。目前BE在读取Hudi数据时,只能读取数据文件,使用parquet的C++SDK。后期我们将与Hudi社区合作,以C++/Rust等语言提供Huidbasefile和deltafile的读取接口,直接使用DorisBE中的原生接口查询Hudi数据。