当前位置: 首页 > 科技观察

基于Flink构建的实时数据仓库,这才是OPPO数据中台的基础

时间:2023-03-19 00:10:28 科技观察

基于Flink的实时数仓是OPPO数据中台的基础OPPO与互联网、大数据有什么关系?下图简单介绍一下OPPO的业务和数据:作为手机厂商,OPPO基于Android定制了自己的ColorOS系统,目前日活跃用户超过2亿。围绕ColorOS,OPPO构建了很多互联网应用,比如应用商店、浏览器、信息流等,在运营这些互联网应用的过程中,OPPO积累了大量的数据。上图右侧展示了整体数据规模的演变:自2012年以来,每年以2-3倍的速度增长。截至目前,总数据量已超过100PB。日增数据量超过200TB。为了支撑如此庞大的数据量,OPPO开发了一整套数据系统和服务,并逐渐形成了自己的数据中心体系。1.2.OPPODataCenter今年大家都在谈DataCenter。OPPO是如何理解DataCenter的?我们将其分为4个层次:最底层是统一的工具体系,涵盖“接入-治理-开发-消费”全数据链路;基于工具体系构建数据仓库,分为“原始层-明细层-汇总层-应用层”,也是经典的数据仓库架构;再往上是全局数据系统,什么是全域?就是打通公司所有的业务数据,形成统一的数据资产,比如ID-Mapping、用户标签等;最后,如果数据能被业务所用,就需要场景驱动的数据产品和服务。以上就是OPPO数据中心的整个系统,数据仓库处于非常基础和核心的位置。1.3.OPPO离线数仓的建设近2、3年,我们的重点一直放在离线数仓的建设上。上图大致描述了整个构建过程:首先,数据源基本是手机、日志文件、DB数据库。我们基于ApacheNiFi打造了高可用高吞吐的接入系统,将数据统一落入HDFS形成原始层;紧接着,基于Hive的小时级ETL和日级汇总Hive任务分别负责计算生成明细层和汇总层;最后,应用层是基于OPPO内部自研的数据产品,主要是报表分析、用户画像、接口服务。此外,明细中层还支持基于Presto的即席查询和自助提现。随着离线数仓的逐步完善,业务对实时数仓的需求也越来越强烈。1.5.从离线到实时的平滑迁移无论是平台还是系统,都离不开上下层的组成:上层是API,是面向用户的编程抽象和接口;下层是Runtime,是面向内核的执行引擎。我们希望从离线到实时的迁移是顺利的。这意味着什么?从API层看,数据仓库的抽象是Table,编程接口是SQL+UDF。在离线数仓时代,用户已经习惯了这样的API。迁移到实时数仓后最好保持一致。从运行时来看,计算引擎从Hive进化到Flink,存储引擎从HDFS进化到Kafka。基于以上思路,我们只需要对上面提到的离线数仓流水线进行改造,就可以得到实时数仓流水线。1.6.搭建OPPO的实时数仓从上图可以看出,整个流水线与离线数仓基本类似,只是Hive换成了Flink,HDFS换成了Kafka。从整体流程来看,基本模型不变,或者由原始层、明细层、汇总层、应用层的级联计算组成。所以,这里的核心问题就是如何基于Flink搭建这个pipeline。下面介绍一下我们基于FlinkSQL所做的一些工作。2、基于FlinkSQL2.1的扩展工作。为什么使用FlinkSQL首先,为什么要使用FlinkSQL?下图展示了Flink框架的基本结构。底部是运行时。我们认为这个执行引擎的核心优势有四个:第一,低延迟和高吞吐量;第二,端到端Exactly-once;第三,容错状态管理;第四,Window&Event时间支持。基于Runtime,抽象出三层API,SQL在最顶层。FlinkSQLAPI有哪些优势?我们也从四个方面来看:第一,支持ANSISQL标准;第二,支持丰富的数据类型和内置函数,包括常用的算术运算和统计聚合;第三,Source/Sink可定制,并以此为基础灵活扩展上下游;第四,批处理流程统一,同样的SQL可以离线运行也可以实时运行。那么,如何基于FlinkSQLAPI进行编程呢?下面是一个简单的演示:首先,定义并注册输入/输出表,这里创建2张Kakfa表,指定Kafka版本是多少,对应哪个topic;下一步是注册UDF,UDF的定义由于篇幅这里就不一一列举了;最后是执行真正的SQL。可以看到,为了执行SQL,需要做这么多编码工作,这不是我们想要暴露给用户的接口。2.2.基于WEB的开发IDE2.5。FlinkSQL对接外部数据源弄清楚FlinkSQL注册表的过程,给我们带来了这样一个想法:如果外部元数据创建的表也可以转化为TableFactory可识别的映射,就可以无缝注册到TableEnvironment中。基于这样的思路,我们实现了FlinkSQL与现有元数据中心的连接。大致流程如下图所示:通过元数据中心创建的表,会在MySQL中存储元数据信息,我们用一个表来记录表。基本信息,然后其他三张表分别记录Connector、Format、Schema转换为key-value后的描述信息。之所以拆成三张表,是为了能够独立更新这三类描述信息。接下来是自定义实现的ExternalCatalog,它可以读取4个MySQL表并将它们转换为map结构。2.6.实时表-维表关联目前我们的平台已经具备了元数据管理和SQL作业管理的能力,但是还缺少一些基础的功能才能真正开放给用户使用。通过我们构建数据仓库,星型模型是不可避免的。这里举一个比较简单的案例:中间的事实表记录的是广告点击流,周围的维度表分别是用户、广告、产品、渠道。假设我们有一个SQL分析,需要将点击流表关联到用户维度表。FlinkSQL中应该如何实现呢?我们有两种实现方式,一种是基于UDF,一种是基于SQL转换。3、构建实时数仓的应用案例下面分享几个典型的应用案例,都是在我们平台上用FlinkSQL实现的。3.1.实时ETL拆分这里是一个典型的实时ETL链路,从大表中拆分出各个业务对应的小表:OPPO最大的数据源是手机埋点,手机APP的数据有一个特点,所有的数据都是通过几个统一的渠道上报。因为不可能每次有新的业务点就升级客户端,增加新的渠道。比如我们有一个sdk_log通道,所有APP应用的埋点都会向这个通道上报数据,导致这个通道对应的原始层表非常庞大,一天几十TB。但实际上每个业务只关心自己的部分数据,这就需要我们在原始层进行ETL拆分。这个SQL逻辑比较简单。无非是根据某些业务字段进行过滤,插入到不同的业务表中。它的特点是将多行SQL最终组合成一条SQL提交给Flink执行。大家担心的是,如果包含4条SQL,会不会重复读取同一条数据4次?其实在Flink编译SQL的阶段会做一些优化,因为最终指向同一个kafkatopic,所以Data只会被读取一次。另外,我们做离线和实时数仓ETL拆分的同一个FlinkSQL,分别落在了HDFS和Kafka上。Flink本身支持写入HDFS的sink,比如RollingFileSink。3.2.实时指标统计下面是一个计算信息流CTR的典型案例。我们计算一定时间内的曝光度和点击量,除以得到点击率??导入Mysql,然后通过我们内部的报表系统可视化。这个SQL的特点是使用了窗口(TumblingWindow)和子查询。3.3.实时标签导入这是一个实时标签导入的案例。手机实时感知当前用户的经纬度,转化为具体的POI,导入ES,最后在标签系统上进行用户定位。这个SQL的特点是使用了AggregateFunction。在5分钟的窗口内,我们只关心用户上次上报的经纬度。AggregateFunction是一个UDF类型,通常用于聚合指标的统计,比如计算sum或者average。在这个例子中,由于我们只关心最新的经纬度,所以每次都可以替换旧的数据。四、对未来工作的思考和展望最后??,我想和大家分享一下我们对未来工作的一些思考和规划,目前还不成熟,分享给大家一起讨论。4.1.端到端实时流处理什么是端到端?一端是采集的原始数据,另一端是报表/标签/接口等数据的呈现和应用,中间的实时流连接两端。目前我们使用基于SQL的实时流处理。源表是Kafka,目标表也是Kafka。Kafka统一后导入Druid/ES/HBase。这样设计的目的是提高整体流程的稳定性和可用性:首先,Kafka作为下游系统的缓冲,可以防止下游系统异常影响实时流的计算(一个系统保持稳定,与多个系统同时稳定相比,概率更高点);其次,kafka到kafka的实时流,exactly-once语义比较成熟,一致性有保证。那么,上述的端到端实际上是分三个独立的步骤完成的,每个步骤可能需要不同的角色来处理:数据处理需要数据开发人员,数据导入需要引擎开发人员,数据资产化需要产品开发人员。我们的平台能不能做到端到端的自动化,只需要一次SQL提交就可以搞定加工、导入、资产化三个步骤?在这种思路下,我们在数据开发中看到的不再是KafkaTable,而应该是面向场景的展示表/标签表/接口表。例如展示表,在建表时,只需要指定维度、指标等字段,平台会自动将Kafka实时流结果数据导入Druid,再自动将Druid数据源导入报表系统,甚至自动生成报告模板。4.2.实时流的血缘分析关于血缘分析,做过离线数仓的朋友都深知其重要性,在数据治理中起着不可或缺的关键作用。实时数据仓库也是如此。我们希望建立一个端到端的血缘关系,从采集系统的接入通道,到流经中间的实时表和实时操作,再到消费的产品,都可以清晰的展现出来数据。基于血缘关系的分析,我们可以评估数据的应用价值,计算数据的计算成本。4.3.离线-实时数仓的集成最后一个方向是离线实时数仓的集成。我们认为,短期内,实时数仓无法替代离线数仓,两者并存才是新常态。离线数仓时代,我们如何将积累的工具体系适配到实时数仓,如何实现离线和实时数仓的一体化管理?从理论上讲,它们的数据源是一致的,上层抽象也是Table,与SQL不同,但有本质区别,比如时间粒度和计算方式。对于数据工具和产品来说,需要做哪些改变才能实现完全融合,是我们正在探索和思考的问题。