摘要:本文整理自京东资深技术专家韩非在FlinkForwardAsia2021流批集成专场的分享。主要内容包括:技术方案的整体思考和优化实施案例,未来展望01整体思考提到流批融合,就不得不提到传统的大数据平台——Lambda架构。可以有效支持离线和实时的数据开发需求,但流批数据链路拆分带来的高开发维护成本和数据口径不一致是不容忽视的缺陷。通过一套数据链路同时满足流批处理的数据处理需求,即流批一体化,是最理想的情况。另外,我们认为流批融合还有一些中间阶段。比如只实现计算的统一,或者只实现存储的统一,意义重大。以计算统一为例,一些数据应用对实时性要求比较高。例如,希望端到端的数据处理时延不超过一秒。一个很大的挑战。以数据湖为例,其数据可见性与commit间隔有关,进而与Flink的checkpoint时间间隔有关。此功能与数据处理链路的长度相结合。可见,实现端到端一秒处理是没有必要的。简单的。因此,对于这样的需求,只实现计算统一也是可行的。通过统一计算降低用户的开发和维护成本,解决数据标准不一致的问题。在实现流批一体化技术的过程中,面临的挑战可以概括为以下四个方面:第一是实时数据。如何将端到端的数据延迟降低到秒级是一个很大的挑战,因为它涉及到计算引擎和存储技术。这本质上是一个性能问题和一个长期目标。第二个挑战是如何兼容数据处理领域已经广泛使用的离线批处理能力。这涉及两个层次的开发和调度。开发层面主要是复用,比如如何复用现有离线表的数据模型,如何复用用户已经在使用的自定义开发的HiveUDF等等。调度层面的问题是如何与调度系统进行合理的整合。第三个挑战是资源和部署问题。例如,通过混合部署不同类型的流和批应用来提高资源利用率,以及如何构建基于指标的弹性伸缩能力来进一步提高资源利用率。最后一个挑战也是最困难的一个:用户感知。大多数用户通常仅限于技术交流或验证相对较新的技术概念。即使经过验证,他们觉得可以解决实际问题,也还需要等待合适的商家来试水。这个问题也引发了一些思考。平台方要站在用户的角度看问题,合理评估改变用户现有技术架构的成本、用户收益以及业务迁移的潜在风险。上图是京东实时计算平台的全景图,也是我们能够实现流批合一的载体。中间的Flink基于开源社区版本深度定制。基于该版本搭建的集群,外部依赖包括JDOS、HDFS/CFS和Zookeeper三部分。JDOS是京东的Kubernetes平台。目前我们所有的Flink计算任务都容器化运行在这个平台上;Flink的statebackend有两种选择:HDFS和CFS,其中CFS是京东自研的对象存储;Flink集群的高可用是基于Zookeeper的。在应用开发方式上,平台提供了SQL和Jar包。Jar方式支持用户直接上传Flink应用Jar包或者提供Git地址给平台打包。另外,我们平台化的功能比较完善,比如基本的元数据服务,SQL调试功能,产品端支持所有的参数配置,以及基于指标的监控和任务日志查询。在连接数据源方面,平台通过连接器支持多种数据源类型。其中JDQ是基于开源Kafka定制的,主要用于大数据场景下的消息队列;JMQ是京东自研,主要用于在线系统中的消息队列;JimDB是京东自研的分布式KV存储。在目前的Lambda架构中,假设实时链路的数据存储在JDQ中,离线链路的数据存储在Hive表中。即使计算相同的业务模型,元数据的定义也往往不同,所以我们引入统一的逻辑模型,兼容实时离线两边的元数据。在计算环节,采用FlinkSQL结合UDF实现业务逻辑的流批统一计算。此外,平台将提供大量公开的UDF,也支持用户上传自定义UDF。对于计算结果的输出,我们也引入了统一的逻辑模型来屏蔽流批两端的差异。对于只进行统一计算的场景,可以将计算结果写入流批次各自的存储中,保证数据的实时性与之前一致。对于同时统一计算和存储的场景,我们可以直接将计算结果写入流批统一存储。我们选择Iceberg作为流批统一存储,是因为它有很好的架构设计,比如不绑定特定的引擎。在兼容批处理能力方面,我们主要开展了以下三方面的工作:一是复用离线数仓中的Hive表。以数据源端为例,为了屏蔽上图左侧流批之间的元数据差异,我们定义了逻辑模型gdm_order_m表,要求用户显式指定Hive表中的字段和Topic来匹配这个逻辑表中字段的映射关系。这里映射关系的定义非常重要,因为基于FlinkSQL的计算只需要面向这张逻辑表,不需要关心实际的Hive表和Topic中的字段信息。在运行时通过连接器创建流表和批表时,会通过映射关系将逻辑表中的字段替换为实际字段。在产品端,我们可以将流表、批表绑定到逻辑表,通过拖拽的方式指定字段之间的映射关系。这种模式使得我们的开发方式不同于以往的方式。之前的方法是新建一个任务,指定是流任务还是批任务,然后进行SQL开发,指定任务相关配置,最后发布任务。在流批一体化模式下,开发模式变成首先完成SQL的开发,包括逻辑和物理DDL的定义,它们之间字段映射关系的规范,DML的编写等,然后再指定流批为任务相关的配置,最终将两个任务发布为流批。二是打通调度系统。离线数仓的数据处理基本都是基于Hive/Spark组合调度模式。上图中间的图就是一个例子。数据处理分为四个阶段,分别对应数据仓库的BDM、FDM、GDM和ADM层。随着Flink能力的增强,用户希望将GDM层的数据处理任务替换为FlinkSQL批处理任务,这就需要将FlinkSQL批处理任务作为中间环节嵌入到当前的数据处理过程中。为了解决这个问题,除了任务本身支持配置调度规则外,我们还打通了调度系统,继承父任务的依赖,将任务本身的信息同步到调度系统,支持父任务作为下游任务,从而实现了FlinkSQL的批处理任务作为原始数据处理的环节之一。三、用户自定义的HiveUDF、UDAF和UDTF的复用。对于现有的基于Hive的离线处理任务,如果用户开发了UDF功能,理想的方式是在Flink迁移时直接复用这些UDF,而不是按照FlinkUDF定义重新实现。关于UDF的兼容性,社区针对使用Hive内置函数的场??景,提供了loadhivemodules的解决方案。如果用户想使用自己开发的HiveUDF,可以使用createcatalog,usecatalog,createfunction,最后在DML中调用。这个过程会在Hive的Metastore中注册函数信息。从平台管理的角度,我们希望用户的UDF有一定的隔离性,限制用户作业的粒度,降低与HiveMetastore交互产生脏函数元数据的风险。另外,当meta信息已经注册完毕,希望下次能在Flink平台上正常使用。如果不使用ifnotexist语法,通常需要先drop函数,然后再执行create操作。但是这个方法不够优雅,而且对用户的使用方式也有限制。另一种解决方案是用户可以注册一个临时的HiveUDF。Flink1.12中注册临时UDF的方式是createtemporaryfunction,但是Function需要实现UserDefinedFunction接口才能通过后续验证,否则会注册失败。所以我们没有使用createtemporaryfunction,而是对createfunction做了一些调整,扩展了ExtFunctionModule,将解析出来的FunctionDefinition注册到ExtFunctionModule中,在Job层面做了一个临时注册。这样做的好处是不污染HiveMetastore,隔离性好,不限制用户的使用习惯,体验好。不过这个问题在社区1.13版本已经全面解决。通过引入Hiveparser等扩展,可以通过createtemporaryfunction的语法注册和使用实现UDF和GenericUDF接口的自定义Hive函数。在资源占用上,流处理和批处理自然是错开的。对于批处理,离线数仓每天0:00开始计算过去一整天的数据,第二天上班前完成所有离线报表的数据处理,所以通常00:00到8:00是批量计算任务占用资源较多的时间段,这个时间段在线流量通常比较低。流处理负载与在线流量正相关,所以这段时间流处理的资源需求比较低。早上8点到晚上0点,线上流量比较大,这段时间的批任务大部分不会被触发执行。基于这种自然交错的峰值,我们可以通过在专用的JDOSZone中混合不同类型的流批应用来提高资源利用率,如果统一使用Flink引擎来处理流批应用,资源利用率会更高。同时,为了让应用能够根据流量进行动态调整,我们还开发了自动弹性伸缩服务(Auto-ScalingService)。其工作原理如下:运行在平台上的Flink任务向metrics系统上报metrics信息,Auto-ScalingService会根据metrics系统中的一些关键指标来判断任务,比如TaskManager的CPU使用率和任务的背压。是否需要增加或减少计算资源,并将调整结果反馈给JRC平台,JRC平台会通过内嵌的fabricclient将调整结果同步到JDOS平台,从而完成TaskManagerPod数量的调整。另外,用户可以通过在JRC平台上的配置来决定是否为任务开启该功能。上图右侧的图表是我们在JDOSZone进行流批混合部门,结合弹性伸缩服务试点测试时的CPU使用情况。可以看出,0分流任务已经缩容,释放资源给批任务。我们设置的新任务是2点开始执行,所以从2点到早上批任务结束,CPU占用率比较高,高达80%以上。批任务运行后,当在线流量开始增加时,流任务扩大,CPU占用率也随之增加。02技术方案整合与流批优化以FlinkSQL为核心载体,所以我们对FlinkSQL的底层能力也做了一些优化,主要分为维表优化、join优化、window优化、Icebergconnector优化。首先是维度表相关的几项优化。FlinkSQL当前社区版本只支持修改部分数据源汇算子的并行度,不支持修改源和中间处理算子的并行度。假设一个FlinkSQL任务消费的Topic有5个分区,那么下游算子的实际并行度为5,算子之间的关系是正向的。对于数据量比较大的维表join场景,为了提高效率,我们希望并行度可以更高一些,希望并行度可以灵活设置,不被数量所束缚上游分区。基于此,我们开发了预览拓扑的功能。无论是Jar包还是SQL任务,都可以解析生成一个StreamGraph进行预览。还可以支持修改分组、算子链策略、并行度、设置uid。通过这个功能,我们还可以调整维表join算子的并行度,将分区策略从forward调整为rebalance,然后将调整后的信息更新到StreamGraph。此外,我们还实现了动态再平衡策略,可以根据backLog判断下游分区的负载情况,从而选择最优分区进行数据分布。为了提升维表join的性能,我们对平台支持的各类维表数据源都实现了异步IO,支持内存缓存。不管是原来的forward方式还是rebalance方式,都存在缓存失效和替换的问题。那么,如何提高维表缓存的命中率,如何减少维表缓存的淘汰呢?以原生的forward方法为例,forward是指每个子任务缓存随机的维表数据,这与joinkey的值有关。对维表的joinkey进行哈希处理,保证每个下游算子缓存与joinkey相关的不同维表数据,从而有效提高缓存命中率。在实现层面,我们添加了一个名为StreamExecLookupHashJoinRule的优化规则,并将其添加到物理重写阶段。在底层扫描数据StreamExecTableSourceScan和维表joinStreamExecLookupJoin之间增加了一个StreamExecChange节点,用于完成对维表数据的哈希操作。可以通过在定义维度表DDL时指定lookup.hash.enable=true来启用此功能。我们通过forward、rebalance、hash三种方式开启缓存,并在同一场景下进行性能测试。主表1亿条数据与维表1万条数据join。在不同的计算资源下,rebalance的性能比原来的forward方法提高数倍,hash方法比rebalance方法好数倍。性能提升一倍,整体效果相当可观。针对维表join单项查询效率比较低的问题,解决方法也很简单,就是把batch累加起来,以微批量(mini-batch)的方式访问。可以通过设置lookup.async.batch.size的值在DDL定义中指定批处理大小。另外,我们在时间维度上也引入了Linger机制,在一批数据因为延迟无法保存的极端场景下限制延迟。您可以在DDL定义中设置查找。async.batch.linger值指定等待时间。经过测试,mini-batch方式可以带来大约15%到50%的性能提升。Intervaljoin也是生产中经常使用的场景。这类业务的特点是流量非常大,比如10分钟100GB。Intervaljoin两个流的数据都会缓存在内部状态中。当任一侧数据到达时,都会获取对面流对应时间范围内的数据,执行join函数。所以,这么大流量的任务,状态会非常大。为此,我们选择了RocksDB作为状态后端,但经过调参优化后效果仍然不理想。任务运行一段时间后,会出现背压,导致RocksDB性能下降,CPU占用率高。通过分析,我们发现根本原因与Flink底层扫描RocksDB的prefix-based扫描方式有关。因此,解决方法也很简单。根据查询条件,精确构造查询上下界,将前缀查询改为范围查询。查询条件依赖的具体上下界的key改为keyGroup+joinKey+namespace+timestamp[lower,upper],可以准确只查询某些时间戳之间的数据,任务的背压问题有也得到解决。并且数据量越大,这种优化带来的性能提升就越明显。regularjoin使用state来保存所有的历史数据,所以如果流量大,state数据会比较大。并且它依赖于table.exec.state.ttl参数来保存状态。此参数的值较大也会导致状态较大。对于这个场景,我们改为使用外部存储JimDB来存储状态数据。目前只实现了innerjoin,实现机制如下:在两侧流传递join后的数据的同时,将所有数据以mini-batch的方式写入JimDB,扫描内存和JimDB同时在加入相应的数据时。另外,可以通过JimDB的ttl机制实现table.exec.state.ttl功能,完成过期数据的清理。以上实现方式的优缺点是显而易见的。优点是可以支持非常大的状态。缺点是目前无法被FlinkCheckpoint覆盖。对于窗口优化,首先是窗口偏移量。需求首先来自一个线上场景。比如我们要统计某个指标从2021年12月4日0点到2021年12月5日0点的结果。但是由于线上集群是东8区的时间,所以实际统计的是result是2021年12月4日早上8点到2021年12月5日早上8点的结果,显然不符合预期。因此,该功能最初是用来修复非本地时区跨日级别窗口统计错误的问题。我们添加了窗口偏移量参数后,可以非常灵活的设置窗口的开始时间,可以支持的需求也更广。其次,还有一种场景:虽然用户设置了窗口大小,但他希望早点看到窗口当前的计算结果,以便早点做出决定。因此,我们增加了增量窗口的功能,可以根据设置的增量间隔触发输出窗口当前计算结果的执行。对于端到端实时性要求不高的应用,可以选择Iceberg作为下游统一存储。但是由于计算本身的特点,用户检查点间隔的配置等,可??能会产生大量的小文件。对于Iceberg的底层,我们选择HDFS作为存储。大量的小文件会给Namenode带来很大的压力,所以需要合并小文件。Flink社区本身就提供了基于Flink批处理的合并小文件的工具来解决这个问题,但是这种方法有点重,所以我们开发了算子级合并小文件的实现。这个想法是这样的。在原有的globalcommit之后,我们新增了三个算子compactCoordinator、compactOperator和compactCommitter,其中compactCoordinator负责获取并下发需要合并的快照,compactOperator负责快照合并操作的执行,可以是多个AcompactOperator是并发执行的,compactCommitter负责提交合并后的数据文件。我们在DDL的定义中添加了两个新参数。auto-compact指定是否启用合并文件的功能,compact.delta.commits指定提交多少次commit来触发compaction。在实际业务需求中,用户可能会从Iceberg读取嵌套数据。虽然在SQL中可以指定读取嵌套字段内的数据,但实际读取数据时,会包含当前嵌套字段。读取所有的字段,然后获取用户需要的字段,会直接增加CPU和网络带宽的负载,所以产生如下需求:如何只读取用户真正需要的字段?要解决这个问题,必须满足两个条件。第一个条件是读取Iceberg的数据结构模式,它只包含用户需要的字段。第二个条件是Iceberg支持按列名读取数据,本身已经满足,所以我们只需要实现第一个条件。如上图右侧所示,结合前面tableSchema和projectFields信息的重构,生成了一个只包含用户需要的字段的新数据结构PruningTableSchema,作为Icebergschema的输入.通过这样的操作,根据用户的实际使用情况来实现。该案例对嵌套结构执行列修剪。图中左下部分的例子是用户优化前后读取嵌套字段的对比。可以看出,基于PruningTablesSchema,可以有效地剪掉无用的字段。经过以上优化,CPU占用率降低了20%~30%。而且在相同数据量下,批处理任务的执行时间缩短了20%~30%。此外,我们还实现了一些其他的优化,比如修复intervalouterjoin数据下发晚于watermark和下游有时间算子导致的数据丢失问题,UDF复用问题,FlinkSQL扩展KeyBy语法,维度表数据预加载、Icebergconnector读取指定快照等功能。03落地案例京东目前有700+个FlinkSQL在线任务,约占Flink任务总数的15%。FlinkSQL任务累计峰值处理能力超过每秒1.1亿条记录。目前一些定制和优化主要是基于社区1.12版本。3.1案例1构建实时通用数据层RDDM流批集成。RDDM全称real-timedetaildatamodel——实时的详细数据模型,涉及订单、流量、商品、用户等。JDV、广告算法、搜索推送算法等。实时业务RDDM层的模型与离线数据中ADM、GDM层的业务处理逻辑一致。基于此,我们希望通过FlinkSQL来统一业务模型的流和批计算。同时,这些业务也具有非常鲜明的特点。比如订单相关的业务模型涉及到大状态的处理,流量相关的业务模型对端到端的实时性要求比较高。另外,一些特殊的场景也需要一些定制化的开发来支持。RDDM的实现主要有两个核心需求:其一,其计算需要大量的关联数据,大量的维度数据存储在HBase中;另外,有些维度数据的查询还有二级索引,需要先查询索引表,从中取出符合条件的key,再到维度表中去获取真正的数据。针对以上需求,我们结合维表数据预加载功能和维表keyby功能来提高join效率。对于二级索引的查询需求,我们定制了连接器来实??现。预加载维表数据的功能是指在初始化阶段将维表数据加载到内存中。该功能结合keyby可以有效减少缓存的数量,提高命中率。一些业务模型有大量的历史数据与之关联,导致状态数据比较大。目前,我们正在根据场景进行定制和优化。我们认为根本的解决方案是实现一套高效的基于KV的statebackend,该功能的实现正在规划中。3.2案例2流量交易黑产舆情分析。其主要流程如下:源头通过爬虫获取相关信息,写入JMQ。数据同步到JDQ后,经过Flink处理后,继续写入下游JDQ。同时通过DTS数据传输服务,将上游的JDQ数据同步到HDFS,再通过Hive表进行离线数据处理。这个业务有两个特点:一是对端到端的实时性要求不高,分钟级的延迟是可以接受的;第二,离线和实时的处理逻辑是一致的。因此,可以直接替换JDQ到Iceberg中间环节的存储,然后使用Flink进行增量读取,使用FlinkSQL实现业务逻辑处理,即完成流式和数据化两套链路的完全统一。批处理。Iceberg表中的数据还可以用于OLAP查询或离线进一步处理。上述链路的端到端延迟约为一分钟。基于算子的小文件合并功能有效提升性能,显着降低存储和计算成本,综合评估、开发和维护成本降低30%以上。04未来规划未来规划主要分为以下两个方面:一是业务发展。我们将加大对FlinkSQL任务的推广力度,探索更多流批融合的业务场景,同时打磨产品形态,加速用户向SQL的转化。同时,平台元数据与线下元数据更深度融合,提供更好的元数据服务。其次,在平台能力方面。我们将继续深挖join场景和大状态场景,同时探索高效的KV型状态后端实现,并在统一计算和统一存储的框架下持续优化设计,减少端到端潜伏。
