当前位置: 首页 > 科技观察

美团基于Flink的实时数仓平台建设新进展_0

时间:2023-03-20 10:22:14 科技观察

摘要:本文整理自美团实时数仓平台负责人姚东阳在FlinkForwardAsia2021实时数据大会上的演讲。仓库专场。主要内容包括:平台建设现状遇到的问题及未来规划的解决方案01平台建设现状美团于2018年首次引入Flink实时计算引擎,当时实时数仓的概念当时不是很流行,平台只提供FlinkJar任务。生命周期管理和监控警报。2019年,我们注意到实时计算的主要应用场景是解决离线数仓时效性低的问题。离线数仓相对成熟,通过SQL开发非常简单。数仓的实时部分主要是通过FlinkDataStreamAPI进行开发,相对于离线数仓的开发方式来说,门槛比较高,也比较碎片化。因此,我们开始研究实时数仓的解决方案,目标是降低开发门槛,并尝试推广FlinkSQL,最终定名为美团实时数仓平台NAU。2020年,美团实时数仓平台正式上线。为业务提供FlinkSQL作业开发入口,主要负责两方面的工作:一是将实时数仓中的通用数据源与离线表概念对齐,使用数据模型进行管理;第二,为FlinkSQL开发提供配套的效率工具,如Verification和debugging功能。但在实际推广过程中,我们发现FlinkSQL运维方面的业务门槛还是比较高的,所以我们将接下来的工作重心转移到了运维中心。FlinkSQL作业运维的痛点主要集中在两个方面:有状态SQL作业部署的中断问题和SQL作业的异常定位问题。为此,我们通过异步检查点持久化和状态生成解决了第一个问题,并通过提供作业的自动诊断解决了第二个问题。目前,整个实时数仓的平台建设已经初步完成。未来,我们将不断细化开发和运维能力,持续推动公司业务数仓架构的演进,如批量生产的融合、生产服务的融合。一体化。实时数仓已基本覆盖公司所有业务,为美团优选、美团购物、金融、单车等100多个业务团队提供支持。托管了7000多个实时数据模型,主要是Kafka表和KV表模型。4000+个FlinkSQL作业在线运行,新增实时SQL作业占比达到70%以上。从数据上看,FlinkSQL已经可以解决美团实时数仓中的大部分流处理问题。下面以美团业务中的两个实时数仓生产环节为例,分享一下FlinkSQL的实际应用。应用场景一是基于FlinkSQL+OLAP的实时生产环节。这个业务环节有两个实时数据源,分别是业务DB的变更事件和业务服务的日志事件。这些事件会先被收集到Kafka中,然后DB事件会根据表名分发到新的Kafka中。DB和日志的数据也会在这一层统一格式,完成实时数仓的ODS层。然后业务会使用FlinkSQL对ODS层的数据进行清洗关联,生成实时数仓的全学科表,最后写入OLAP查询引擎进行实时分析。对于时效性要求不高的场景,部分业务还会在OLAP引擎上配置分钟级调度,以降低同查询的压力。应用场景2和场景1的区别在于,业务实时数仓的主题宽表数据不是直接写入OLAP查询引擎,而是继续写入Kafka,使用FlinkSQL做索引聚合APP层,最后将预先计算好的Indicator数据写入到OLAP、DB、KV等应用层存储中。这种方式更适合对接数据业务,因为它兼顾了数据的时效性和高QPS查询。上图展示了实时数仓平台的架构,分为集成、开发、运维、治理、安全五个模块。集成模块主要针对数据模型的管理,包括Kafka和KV模型管理,管理内容包括数据源的schema信息和连接信息。开发模块主要针对FlinkSQL转化业务需求,如提供版本管理记录业务需求的迭代过程,提供FlinkSQL的验证调试,确保开发的SQL正确表达业务逻辑,支持业务使用自定义FlinkUDF函数和自定义Format解析让FlinkSQL可以扩展以满足更多的业务需求。运维模块侧重于SQL作业的部署和运行时监控。监控方面,我们提供SQL作业监控告警、异常日志、作业诊断,帮助业务快速发现和定位作业异常;在部署方面,我们提供SQL作业快照管理、AB部署、参数调优,帮助业务解决SQL作业变化时的问题。治理模块关注实时数仓的数据质量和资源成本。通过构建实时数仓的DQC监控,帮助业务发现上游数据或输出数据的异常值/异常波动;业务可以量化实时数仓的生产成本,方便成本管理。安全模块主要围绕数据流向的控制,提供数据源读写权限的管理和限域机制,保障公司业务数据的安全。02遇到的问题及解决方案在实际推广FlinkSQL的过程中,我们也遇到了很多挑战。2.1双流关联的大状态问题首先是双流关联的大状态问题。FlinkSQL的双流关联会保留左右流的历史数据相互关联。关联所需的时间间隔越长,保存的历史数据越多,状态减少的也越大。例如,要关联一个订单的订购事件和退款事件,保证计算结果的正确性,就需要考虑这两个事件之间的时间间隔,可能是一个月,也可能更长。上图左侧是关联两个流的有状态SQL作业。图中的Mem和Disk组成了SQL作业的TaskManager节点。SQL作业状态后端使用RocksDB,状态持久化在HDFS文件系统上。一开始我们尝试将SQL作业的状态设置为保留一个月,但是SQL作业会变得不稳定,出现内存超限、状态读取性能下降等问题。我们只能不断增加TM的数量和作业的内存大小来缓解这个问题。.即便如此,业务仍然存在两个痛点。首先,难以初始化关联数据。目前公司Kafka数据源对历史回溯有限制,业务无法构建完整的历史状态。即使Kafka支持更长的回溯,状态初始化的效率仍然是个问题。其次,内存资源的开销较大,尤其是多个SQL作业关联同一个数据源时,需要为每个SQL作业分配相应的内存资源,不同SQL作业的状态隔离,相同的关联数据之间作业不能重复使用。解决方案针对以上问题,我们提出了冷热关联分离的解决方案。假设两天前的数据关联比较少,状态回滚不会超过两天,那么可以定义两天前的数据为冷数据,两天内的数据为热数据。如上图所示,左边的SQL作业通过设置状态保留时间只保留T+0和T+1的热数据,而T+2及更早的冷数据每天从Hive通过批处理任务同步到外部存储KV。关联时,如果状态下的热数据不存在,则通过访问外部存储KV来关联冷数据。右边是另外一个需要关联同一个数据源的SQL作业,它和左边的SQL作业共享外层KV的冷数据。对于第一个痛点,由于状态控制在两天以内,SQL作业上线时,关联数据初始化的数据量是可控的。对于第二个痛点,因为前两天的数据大部分存储在外层KV中,不同的SQL作业可以查询外层存储中的KV,可以节省大量的内存资源。2.2SQL变更状态恢复问题第二个问题是有状态SQL逻辑变更后如何恢复状态?FlinkSQL支持有状态的增量计算。状态是增量计算的历史积累。其实业务需要修改逻辑的情况有很多。上图右侧列出了一些常见的SQL变更,比如增加聚合指标、修改原有Index口径、增加过滤条件、增加数据流关联、增加聚合维度等,比如业务增加了更多服务维度,在数据产品中需要扩展分析维度,所以也需要对FlinkSQL进行改造,增加聚合维度。但是,上述SQL逻辑发生变化后,无法从之前的状态恢复,因为历史状态无法保证变化后的SQL的完整性,即使恢复后也无法100%保证后续计算的正确性。在这种情况下,为了保证数据的正确性,业务需要回溯,从历史中重新计算。回溯过程会造成线上中断,但业务不想牺牲太多的时效性。解决方案对于这个问题,我们给出三种解决方案。方案一:双链路切换。该解决方案的关键是建立另一条相同的实时链路作为备份链路。更改有状态SQL时,可以在备份链路上回溯,重新计算历史数据。回溯完成后,首先验证备份链路的结果数据,确定没有问题后,切换链路最下游数据服务层的读表,完成整个变更过程。解决方案2:绕过状态生成。与双链路切换不同的是,这里改变的是链路上的单一作业。思路是临时启动一个bypassjob回溯,建立新的逻辑状态,待数据校验通过后重启onlinejob。完成SQL和状态的同步切换。方案三:历史状态迁移。前两种方法的思路是相似的。两者都是根据历史数据重新计算,构建新的状态。但是这个想法是在历史状态的基础上迁移一个新的状态。虽然通过这种方式构建的新状态不能保证完整性,但在某些情况下,业务是可以接受的。目前,我们修改了StateProcessAPI,允许Join和Agg算子增加列,同时保持SQL算子及其上下游关系不变。以上三种方式各有优势,可以从普适性、资源成本、在线中断、等待时间四个维度对以上三种方案进行横向比较。通用性是指在保证数据正确的前提下支持的SQL变化范围。前两种方式都是重新计算,状态是完整的,所以比方案三更具有普适性。资源成本是指完成SQL变更需要额外的Flink或者Kafka资源。方法一需要搭建全链路,需要较多的Flink和Kafka资源,因此成本最高。在线中断是指下游数据在变更过程中延迟的时间长度。方法一是在数据业务层切换,几乎没有中断;方法2中断的时间长短取决于作业从状态中恢复的速度;方法三除了状态恢复,还需要考虑状态迁移的速度。等待时间是指完成整个变更过程所需的时间。前两种方式需要重新计算,所以等待时间比方式三长。上图是方式二的平台自动化流程,流程分为七个阶段。更改过程的执行时间较长,可能需要几十分钟。通过图中每个阶段的流程条和执行日志,用户可以感受到变更的进度和状态。我们还为用户做了自动指标检查。比如绕过数据回溯的第二阶段,我们会检查作业消费Kafka的积压指标,判断回溯是否完成,完成后自动创建新的逻辑状态。再比如第六阶段,当原job从bypassjob开始时,会比较KafkaOffset指标,比较两个job的消费进度,保证上线后不会少发数据作业重新启动。2.3FlinkSQL调试繁琐遇到的第三个问题是FlinkSQL调试繁琐,操作步骤多,业务需要额外创建作业和Kafka,存储导出的结果。此外,输入结构复杂。为了有针对性地调试某些输入场景,业务需要编写代码构造消息写入数据源,甚至需要控制消息从多个不同数据源到达的顺序。上图左侧可以看到,为了进行FlinkSQL调试,需要手动搭建一个与线路隔离的调试链路,然后写入Mock数据。解决方案?以上问题的解决方案是:基于文件的一键调试。首先,业务可以在Web端在线编辑Mock数据。模拟数据是有界消息序列。它的初始化可以先从线上采样,然后由业务修改。业务构建mock数据后,会将SQL作业的mock数据持久化到右边的S3文件对象系统中。在业务的web端点击调试,左侧发起的调试任务会在与线上隔离的服务器上单进程执行。执行时会从S3获取之前上传的Mock数据,可以使用Mock数据指定的多源消息。执行完成后,输出结果也会持久化到S3,最后在web端查询S3呈现给业务。更多情况下,业务不需要修改Mock数据,只需要做采样和执行两步即可。此外,我们还支持一些高级调试功能,比如支持控制消息的顺序和间隔等。上图是基于上述方案的调试工具。业务会为SQL作业创建多个测试用例,包括Source的Mock数据和Sink的预期结果。调试完成后,会检查所有测试用例的通过情况。通过条件是保证结果流Merge后的表与预期的表数据一致。2.4SQL作业异常定位问题第四个问题是FlinkSQL作业的异常定位。作业异常是指作业消费Kafka有积压。要解决这个问题,就要定位到积压的原因。在定位原因时,归因路径较为复杂,排查门槛较高。此外,由于归因路径缺乏系统沉淀,定位需要较长时间。随着SQL作业越来越多,如果完全依赖人工排查,工作量会非常大。解决方案针对上述问题的解决方案是对SQL作业实现自动化异常诊断。通过FlinkReporter上报SQL作业的运行指标,并持久化到TSDB进行历史查询。同时,SQL作业的运行日志也会被持久化。告警服务会根据规则监控SQL作业上报的KafkaOffset指标。当消费Offset滞后于生产Offset时,会判断比特作业发生消费积压,然后发出告警,发送异常事件,诊断服务会监测告警服务的异常事件。当出现异常时,根据异常时间窗口内的作业日志和作业指标分析异常原因,诊断服务可以通过添加规则积累人工排查的经验。例如,如果发生Restart,会根据关键字从日志中提取异常信息;如果没有Restart,则根据背压指标找到瓶颈节点,然后结合GC指标、数据倾斜、火焰图等分析瓶颈原因,最后提出调优建议。上图是诊断业务消息脏数据的例子。图中的runningoverview栏会给出SQL作业在每个时间checkpoint的诊断状态。绿色表示运行正常,红色表示作业异常。通过这条时间线,可以清楚的看到异常发生的时间点。在诊断结果栏可以看到异常的原因、详情和建议。比如本例,原因是业务消息中有脏数据。在详情中可以看到导致作业异常的原始消息内容。建议中会提示业务配置脏数据处理策略。03未来规划未来,美团实时数仓平台的规划主要包括以下两个方面。首先是流批一体化开发和维护,我们即将在实时数仓平台上集成数据湖存储,并开放FlinkSQL批处理作业,在存储和计算层实现流批统一,提高工作效率。其次是作业的自动调优,将持续提升作业诊断的准确率和作业重启的效率。