数据仓库是公司数据发展到一定规模后必须提供的基础服务,也是“数据智能”建设的基础环节。快速获得数据反馈,不仅有利于提升产品和用户体验,也有利于公司科学决策,因此数据采集的实时性尤为重要。目前,企业的数据仓库建设大多是离线和实时的。实时数仓用于要求低延迟的业务;离线数据仓库用于复杂的业务。架构非常复杂,需要使用很多系统和计算框架。这就需要企业储备各个领域的人才,导致人才成本高,问题排查困难。最终用户还需要熟悉多种语法。本文分析了目前的数仓架构,探讨了离线数仓和实时数仓是否可以一起考虑,并探讨了Flink的统一架构是否可以解决大部分问题。数据仓库架构数据仓库可分为三层:ODS(原始数据层)、DW(数据仓库层)、ADS(应用数据层)。1.ODS(OperationDataStore)层传输来自日志或业务DB的原始数据。传统的离线数仓方式也是直接使用CDC(ChangeDataCapture)工具定期同步到数仓。使用一套统一的Kafka来接管这个角色,可以让数据更实时地落入数据仓库,也可以在这一层统一实时和离线数据。2.DW(Datawarehouse)层DW层一般分为DWD层和DWS层:DWD(Datawarehousedetail)层:详细数据层,这一层的数据要经过清洗,清洗准确的数据,它包含相同的信息作为ODS层,但它遵循数据仓库和数据库的标准模式定义。DWS(Datawarehouseservice)层:汇总数据层,这一层可能是轻微的聚合,可能是星型或者雪花型结构的数据,这一层做了一些业务层的计算,用户可以根据第一层计算出自己需要的数据数据服务。3、ADS(ApplicationDataStore)层与DWS的区别在于,这一层直接面向用户的数据服务,不需要再次计算,已经是最终需要的数据。主要分为两个环节:业务DB和日志->Kafka->实时数仓(Kafka+Dim维表)->BIDB->数据服务业务DB和日志->Kafka->离线数仓(Hivemetastore+HDFS)->BIDB->数据服务的主流数据仓库架构仍然是Lambda架构。Lambda架构虽然复杂,但是可以覆盖业务需要的场景,是业务最灵活的方式。Lambda架构分为两个环节:传统离线数据具有稳定性、计算复杂、灵活等优点。它运行批量计算以确保T+1报告生成和灵活的Ad-hoc查询。实时数据仓库提供低延迟的数据服务。传统的离线数仓往往存在T+1的延迟,让分析师无法实时做出决策。实时数仓整个链路的延迟最低,甚至可以做到秒级,不仅加速了分析和决策,也给更多的业务带来了可能,比如实时监控告警.Flink的强项是实时计算和流计算,Kafka是实时数仓存储的核心。上图是1-9条边,每条边代表数据转换,就是大数据的计算。本文将对这些优势进行分析,探讨Flink在其中可以发挥的作用。Flink的单栈计算元数据先说元数据管理。离线数仓有Hivemetastore来管理元数据,而纯Kafka没有管理元数据的能力。这里推荐两种方法:1.Confluentschemaregistry构建启动schemaregistry服务后,可以通过confluent的url获取表的schema信息。对于上百个字段的表,在编写Flink作业时可以省去很多工作量。Flink还将其模式推理功能与Confluent模式注册表相结合。但是还是不能省去建表的过程,用户还需要填写Confluent对应的URL。2.Catalog目前,Flink已经内置了HiveCatalog。Kafka表可以直接集成到Hivemetastore中,用户可以直接在SQL中使用这些表。但是Kafka的start-offset有些场景需要灵活配置。为此,Flink也提供了LIKE[1]和TableHints[2]来解决。Flink中离线数仓和实时数仓都使用HiveCatalog:usecatalogmy_hive;--buildstreamingdatabaseandtables;createdatabasestream_db;usestream_db;createtableorder_table(idlong,amountdouble,user_idlong,statusstring,ttimestamp,...--可能有几十个字段ts_daystring,ts_hourstring)with('connector.type'='kafka',...--Kafkatable相关配置);',...--Hivetable相关配置);使用Catalog,后续计算可以完全重用批处理和流,以提供相同的体验。数仓导入计算①和⑤分别是实时数仓的导入和离线数仓的导入。最近,更多的实时离线数据仓库导入已经成为数据仓库中越来越普遍的做法。Flink的引入可以让离线数仓的数据更加实时。以前主要是通过DataStream+StreamingFileSink导入,不支持ORC,无法更新HMS。FlinkStreaming集成Hive后,提供了Hive的StreamingSink[3]。使用SQL更方便灵活,利用SQL的内置函数和UDF,可以复用流和批来运行两个流计算作业。insertinto[stream_db.|batch_db.]order_tableselect...fromlog_table;数据处理计算②和⑥分别是实时数仓和离线数仓的中间数据处理。主要有3种计算:ETL:同数据导入,批流无区别。维表连接:向维表添加字段是一种非常常见的数据仓库操作。在离线数仓中,基本上可以直接joinHive表,但是Streaming作业有些不同,下面会详细介绍。聚合:在Streaming作业的这些有状态计算中,产生的不是一个一次性确定的值,而可能是一个不断变化的值。维表连接不同于离线计算。离线计算只需要关心某个时间点的维表数据,而流式作业则继续运行。所以不能只关注静态数据,而需要是动态的维度表。另外,为了Join的效率,streamingjobs往往join的是一张数据库表,而不仅仅是一张Hive表。示例:--stream维表usestream_db;createtableuser_info(user_idlong,ageint,address,primarykey(user_id))with('connector.type'='jdbc',...);--将离线数仓的维表导入数据仓库实时insertintouser_infoselect*frombatch_db.user_info;--维表Join,SQL批流多路复用insertintoorder_with_user_ageselect*fromorder_tablejoinuser_infoforsystem_timeasoforder_table.proctimeonuser_info.user_id=user_info.user_id;这里有个很麻烦的事情,就是在实时数仓中,需要周期性的调度和更新维表到实时维表数据库,能不能直接加入离线数仓的Hive维表?目前社区也在开发Hive维表。它有什么挑战:Hive维表太大,缓存中放不下:考虑Shufflebykey,分布式维表的join,减少单个并发缓存的数据量。考虑将维表数据放入State。维表更新问题:简单的方案是TTL过期,比较复杂的方案是实现Hive流源,结合Flink的watermark机制stateful计算和数据导出示例:selectage,avg(amount)fromorder_with_user_agegroupbyage;一个简单的聚合SQL,它在批计算和流计算中的执行方式是完全不同的。流式聚合与离线计算聚合最大的区别在于它是一个动态表[4],它的输出是不断变化的。动态表的概念很简单,一个由输入驱动输出的流式计数,而不是像batch一样获取所有输入后输出,所以它的结果是动态变化的:如果在SQL内部,Flink内部的retract机制会保证:SQL结果与批处理相同。如果是外接存储,这就给sink带来了挑战。有状态计算后的输出:如果sink是一个可更新的数据库,比如HBase/Redis/JDBC,那么这似乎不是问题,我们只需要不断更新即可。但是如果是不可更新的存储,我们就没有办法去更新原来的数据了。为此,Flink提出了Changelog[5]的支持,想内置对这种sink的支持,并输出特定schema的数据,让下游消费者也能正常工作。例子:--batch:计算完成后,一次性输出到mysql,同一个key只有一条数据--streaming:mysql中的数据不断更新变化insertintomysql_tableselectage,avg(amount)fromorder_with_user_agegroupbyage;--batch:只有一个数据具有相同的key,append可以插入hive_tableselectage,avg(amount)fromorder_with_user_agegroupbyage;--streaming:中的数据不断追加kafka,多加一列,表示这是upsert消息,后续Flink消费会自动创建处理upsert插入kafka_table的机制selectage,avg(amount)fromorder_with_user_agegroupbyage;可计算AD-HOC和OLAP离线数仓⑨,对于明细数据或汇总数据均可进行即席查询,让数据分析师进行灵活的查询。目前实时数仓的一个很大的缺点就是不能Ad-hoc查询,因为它不保存历史数据。Kafka可能可以保存3天以上的数据,但是存储成本高,查询效率不好。一种思路是为OLAP数据库的批流提供一个统一的sink组件:DruidsinkDorissinkClickhousesinkHBase/Phoenixsink总结本文从目前的Lambda架构出发,分析Flink单栈数仓计算方案的能力。本文中Flink的一些新功能还在快速迭代演化中。通过不断的探索和实践,有望逐步朝着计算融合的方向前进。未来的数据仓库架构希望真正统一用户的离线和实时,提供统一的体验:统一元数据统一SQL开发统一数据导入导出未来考虑统一存储参考[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-110%3A+Support+LIKE+clause+in+CREATE+TABLE[2]https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Table+Hints[3]https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table[4]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html[5]https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
