当前位置: 首页 > 科技观察

三端综合计算解决方案:UnifySQLEngine

时间:2023-03-12 07:19:34 科技观察

背景在漫长的数仓建设过程中,实时数仓和离线数仓由不同的团队独立构建。有庞大而广泛的离线数仓系统,也有需要追求业务时效性需要构建实时数仓。但在业务数据需求和数据产品需求中,往往需要结合实时数据和离线数据进行对比分析。然而,这两种天然不同的数据存储和计算结构需要同时开发两套数据模型。在数据处理过程中,实时数仓需要使用Blink/Flink进行处理,离线需要编写ODPSSQL处理,在线计算模型需要开发java代码处理。如上图所示,实时数据和离线数据在存储层、计算层和服务层分离独立构建。实时数据具有增量计算的特点,需要快速传输和计算。主要由DataHub、Flink、Hbase等异构系统支撑,串联起来形成一个完整的实时计算全链路。离线数据具有时序、批量计算的特点,由统一存储、统一计算的ODPS系统支撑。除了计算环节的差异,在数据处理上也存在逻辑上的差异:流批处理不能复用,ODPS和Blink/Flink的SQL标准不一致,底层调度和数据处理逻辑也有本质区别。一种是以MR为核心的批处理方式,一种是以Flink/Blink为核心的流处理方式。一些流批处理场景需要调用HSF接口。调用HSF接口在JavaSpring环境下很容易,但是在ODPS/Flink环境下,就变成了一个格外大的挑战,甚至是不可能的,因为在ODPS/Flink环境下Flink无法加载或者极难使用弹簧容器。这会让开发者在面对复杂的流处理场景时,更倾向于使用熟悉的Java环境,但同时也意味着失去像ODPS/Flink这样接近业务表达式的SQL表达式。除了流批处理,计算处理还有更广泛的在线交互计算。这和流处理异步处理是不一样的。它需要同步计算并返回结果。这通常是在Java环境下开发的。HSF接口,但是如果要对外同时提供流计算、批计算、在线计算三种能力,就需要三端开发。流计算和批计算还有SQL。虽然不一致,但至少相似,但是在线交互计算需要纯Java。开发,将SQL翻译成Java代码是一个不小的挑战。UnifySQLVS流批一体化面对三种计算模式、研发效率低下、数据质量不可控、数据接口服务臃肿的困境,三端(流计算、批计算、在线计算)一体化计算的想法顺理成章盛,我在价格力行业做了20年,一直在思考解决方案。其实这也是大数据架构经常面临的问题。业界已经达成共识,可以归纳出两种解决方案:?流批一体计算,同一个引擎流批计算模式,实时数据计算在流计算模式,离线数据计算在流计算模式。批计算模式。流批融合计算的典型架构是:Flink+Kappa架构。Flink可以实现基于SQL的流批一体化的计算表达式。复杂的计算由Java应用程序承担。价格-功率计算架构是此类架构的典型代表,但该架构存在以下问题:没有解决在线交互计算。Flink解决了流式计算,批处理融合计算的能力,两者都是异步处理,只是时效性不同,但不具备解决在线计算的能力。如果要提供在线计算能力,必须在以下两种方案中进行选择:通过Java应用提供同步计算接口,所以有两套逻辑:一套是Flink实现的流批处理,一套是在线处理由Java实现。提供了两个接口,一个接口是发起一个计算请求,计算请求交给Flink处理后,提供一个轮训查询接口,用于查询计算的数据。这个方案至少在计算上实现了一套代码,但是这种同步转异步处理的方案势必会影响到产品的设计。Flink的批处理吞吐量Flink的批处理其实有点一厢情愿。为什么这么说呢,因为它的吞吐量规模和MR批计算(ODPS)不是一个级别的。如果Flink真的能做到完全等同于ODPS的吞吐量,从规模和资源成本上来说,完全不需要ODPS,但现实是对于一些只有批处理的场景(比如特征预处理,统计computing),ODPS依然是第一优先级,只是在面对流式批处理的时候。Flink确实为对batchsize没有要求的场景提供了非常好的all-in-one的解决方案。?UnifySQL自动将相同的SQL代码翻译成流计算引擎和批计算引擎进行流和批计算,同时也翻译成HSF接口代码,提供在线交互计算能力。Flink的流批一体化架构非常好,可以解决90%的流批一体化问题,但遗憾的是,我们的一些业务场景(典型的价格计算场景)是Flink写SQL远远解决不了的:整个e-商务是一个非常复杂的业务系统,就我所在的营销领域,要面对招商模式、投放模式、活动模式、权益等,这些远不是一个单一的系统可以承担的,也不是一个单人团队可以承担。阿里针对这么复杂的业务设计了HSF这样的微服务电商架构。但是Flink和电商等Java技术栈是明显分开的。服务架构的红利,一方面可以利用Flink的SQL流批处理能力。考虑到Flink本身的局限性,与其让Flink支持HSF,还不如让Java环境支持FlinkSQL。也就是说,设计一个SQL引擎。它可以通过sql流处理方式处理Java对象,将流引擎嵌入到Java中,按需使用。Flink的批处理引擎非常耗资源,面对T级离线数据的批处理几乎无法使用。上文提到,Flink批处理的吞吐量远低于ODPS的MR批处理。那我们为什么不让这样,计算还是交给ODPS来处理呢。但是ODPS和Flink的SQL标准不一致,需要两端开发。现在问题就变成了:如何统一ODPS和Flink的开发。简单点说,ODPS和Flink能不能用?构建一层统一的UnifySQL。这个SQL引擎可以翻译成ODPS或者Flink可以理解的处理(ODPS翻译成MR程序,Flink翻译成StreamOperator)来抹平ODPS和Flink之间的SQL语义差异。如果只是抹平ODPS和Flink之间的SQL差异,好处不会很大,但是其统一的SQL表达式计算设计可以进一步拓宽其应用范围,比如在线交互计算,或者我们可以进一步构建统一的计算引擎,包括以不同模式安排计算能力。例如,某些场景对时效性要求很高。我们可以调度Flink计算。没有时效性要求,但数据量巨大,只能进行离线计算调度。一些需要提供HSF接口,在调度应用上启动spring接口。UnifySQLEngine淘宝价格计算引擎,以Flink+Kappa为核心数据架构,这个数据架构的演进可以参考我的其他文章,三种计算模式的叠加就是价格服务计算引擎的普通模式,并且都在各自的核心计算中发挥最大优势:ODPS:离线批计算引擎,核心优势,计算吞吐量非常高,但时效性差,面向MR和SQL的编程模式,业务和BI友好,主要用于数据预处理和离线特征处理,公共维表ETL等Flink:流处理引擎,核心优势,低延迟计算,时效性好,极高的容错性和高可靠性,但吞吐量与ODPS相比一般,有StreamAPI和SQLAPI的编程模式,业务和BI友好,主要用于实时数据处理(discounts,orders等),消息预处理:核心优势,丰富的电商JavaHSF接口,复杂的领域模型,面向对象设计,对开发友好,但对业务和BI不友好,容错性和可靠性依赖于开发设计,延迟和吞吐量也高度依赖于开发设计。那么如何融合这三种不同的计算架构,Flink提出了一个引擎来承接所有的计算模式,即Flink的流批一体化引擎,但由此带来的问题是底层引擎本身很难完全理解不同的计算模式,与其统一计算引擎,不如统一表达和调度,把真正的计算委托给各自的计算引擎,这就是UnifyEngine的核心思想。?SQL引擎技术实现三端融合有一个核心技术难点,就是SQL引擎。许多数据产品都有自己的SQL引擎。Flink内部有SQL引擎,ODPS有C++实现的SQL引擎,Hive也有,Mysql内部也有SQL解析引擎。这些SQL引擎高度集成到各自的存储和计算中。如果想找一个可以在Java环境下使用的独立的SQL引擎,市面上有一些,但是要么很复杂。calcitesql引擎,或者说是一个非常简单的select*simplesql引擎,能做的事情很少,开箱即用的几乎没有。而UnifySQL引擎是实现三端融合的核心组件。没有它,就谈不上别的了。从头开始设计SQL引擎的成本非常高。不说复杂的语法分析和AST语法树生成,光是SQL逻辑计划优化就很复杂了。好在业界有一个可以二次开发的SQL。引擎是方解石SQL引擎。事实上,很多SQL引擎都是基于calcite重新开发的。比如Flink和Spark里面的SQL分析引擎都是基于方解石重新开发的。我们设计的SQL引擎也是基于方解石的。Calcite使用基于关系代数的查询引擎,专注于关系代数的语法分析和查询逻辑的规划,并通过calcite提供的SQLAPI(解析、验证等)转化为关系代数的抽象语法树,并根据一定的规则或成本估算优化AST关系,最终进一步生成ODPS/Flink/Java环境可以理解的执行代码。Calcite的主要功能:SQL解析:Calcite的SQL解析是通过JavaCC实现的,使用JavaCC将其转为SQL语法描述文件,将SQL解析为未经验证的AST语法树。SQL验证:无状态验证,即验证SQL语句是否符合规范;statefulverification,通过元数据验证SQLschema、field、UDF是否存在,类型是否匹配。这一步生成未优化的RelNode(逻辑计划树)SQL查询优化:优化以上步骤的输出(RelNode),这个过程会循环优化器(RBO规则优化器和CBO成本优化器),在保持语义的基础上等价,生成执行成本最低的SQL逻辑树(Lo)。article/1df5a39bb071817e8b4cb4b29),这里就不详细解释了。有了方解石,解决了SQL->logictree,但是要真正执行SQL计算,还需要将逻辑数进一步转化为物理执行树(PhysicalExecDAG)。在这个DAG中,包含了可执行的Java代码(JavaCode)片段,这些片段最终被发送到不同的执行环境,会进一步串联成环境可以执行的链接。比如在ODPS环境下,会生成MR代码。在Flink环境中,它会被转换成StreamOperator。在Java环境中,它会被转换成一个CollectorChain,在Spring环境中,它会被转换成一个Bean组件。PS:如果你看过Flink的源码,你对上面的流程会很熟悉。是的,UnifySQLEngine并不是从零开始设计的,它是基于对Flink1.12源码的魔改。Parse和下面要说的Codegen技术都是直接参考了Flink的设计,当然说是魔改,就是还有很多代码需要在上面开发,比如from将DAG执行到在每个环境中实际可执行的MR/Bean/Stream。?Codegen技术经过SQL解析、逻辑优化器和物理优化器生成PhyscialRel物理计划树,包括大量复杂数据的逻辑处理,比如SQL中常见的CASEWHEN语句。一种常见的做法是定义一个父类(如ExecNode)在实际运行时委托给真正的子类运行,这就涉及到查找大量的虚函数表。最后,这种分支指令在一定程度上阻碍了指令的流水线和并行执行,从而产生了这种查找成本。比函数本身的执行成本更昂贵。Codegen技术就是专门为这样的场景而诞生的。业界比较优秀的Codegen技术有LLVM和Janino。LLVM主要面向编译器,而Java代码codegen通常使用Janino。Janino是一个小而快的Java编译器,不仅可以像Javac一样将一组java文件编译成Class文件,还可以编译Java表达式、语句块、类定义块或Java文件,直接加载到ByteCode中,并在其中运行同一个JVM。UnifySQLEngine还使用了JaninoforCodeGen技术,有效提升了代码执行效率。关于Janino的更多信息可以参考这篇文章:JavaCodeGen编译器Janino(地址:https://zhuanlan.zhihu.com/p/407857568)。下面是使用Codegen和不使用Codegen的技术性能对比:表达式100*x+20/2(x+y)(xx+y)/(x-y)100/(xy)节点树遍历执行10ms88msJanino生成代码执行6ms9ms可以看出,当表达式比较复杂时,使用Janino的效果会更加明显。?有状态计算通常分为无状态计算和有状态计算。无状态计算一般是过滤和项目映射。每个计算都依赖于当前的数据上下文并且相互独立。存储是保存中间的计算结果或者缓存的数据,但是还有一种有状态的计算。除了当前的数据上下文,还需要依赖之前计算的中间状态数据。典型例子:sum求和:需要有storage来保存当前的求和结果,当有新的数据进来时,根据当前的中间结果进行累加和去重:去除之前重复的数据,需要保存之前处理过的数据,然后有新的数据要计算,是否和保存的数据进行比较重复排序:需要storage保存之前整理好的数据。当有新数据进来时,之前的排序结果会发生变化,经过diffing后,重新排序后变化的数据会重新发送给下游可见。当有状态计算需要返回存储来携带中间状态结果时。UnifySQLEngine支持内存、Redis、Hbase三种后台存储:内存状态只保存到内存中。一旦重新启动,历史数据将丢失。MemoryState通常用于单机有状态计算,容忍数据丢失。一般用在ODPS的MR程序中,因为一次MR调用的状态计算只需要当前执行上下文的累加结果,不需要保存到全局缓存中。不同batch间的累加结果在MRAPI之间传递,内存State完全够用。Redis:Redis作为多机状态计算的后台存储。UnifySQLEngine在Java环境中默认使用这个作为后台存储。Redis备份存储一般用于Java计算环境。数据会流经不同的生产机器,计算的中间结果需要全局可见。Hbase:如果状态数据超过100G,可以选择Hbase作为后备存储。虽然性能不如Redis,但是可以长期保存状态,这对于长期的状态计算非常有用。?JOIN语义Flink可以支持双流join,但是Flink双流join的语义完全照搬了SQLJOIN语义,即一侧的数据会和另一侧的所有数据进行join。这对于离线分析来说是没有问题的,但是对于实时计算来说,就会出现重复计算,在某些场景下,会破坏业务逻辑。比如订单流转到双流JOIN折扣表时,就会出现这个问题。折扣表中的数据会不断变化,但我们希望以快照数据作为JOIN的基础,而不是重复折扣变化的数据,UnifySQLEngine实现了后一种语义,即SNAPSHOTJOIN,这也是业务场景中常见的语义:一些想法?统一调度UnifySQLEngine现在可以将SQL翻译成可以在不同执行环境运行的任务,通过UnifySQL统一不同环境的逻辑计算,但是它最终还是和我们期待的相去甚远。其中一个要点是实现统一调度和分配。不同环境的协调现在需要开发者自己分配和调度,比如哪些计算需要交付给ODPSMR计算,哪些在Java环境中运行。未来我们希望这些分配也能统一调度运行,包括增量计算全量和自动协同,离线和在线数据协同等。?资源成本通过UnifySQLEngine,开发者可以自己选择底层计算引擎.对于数据量大但时效性要求不高的场景,可以选择ODPS计算。对于时效性要求和可接受的数据规模,可以选择在Flink中进行调度。对于复杂的计算逻辑,需要大量的HSF接口,你可以选择在Java环境下入手,选择你最能接受的资源和成本,承接它的计算语义。同时也希望通过UnifySQLEngine最大限度地利用计算资源。比如Java应用很多时候都是空闲的,CPU利用率比较低。比如可以将一些流计算交付给这些闲置的应用程序,占用CPU非常少(比如5%以内),整体的资源利用率就会提高。比如Flink计算资源比较难申请,可以选择在Java环境下计算(Java相比Flink环境缺少一些特性,比如Exactlyonce语义)等等。团队介绍我们是淘宝的技术营销工具团队,负责天猫的各种促销活动,为消费者/商家开发营销产品,负责双11核心玩法、价格控制、权益分发等核心业务团队。涵盖全球营销产品,拥有1亿用户规模和千万商户规模。充满技术挑战,包括复杂的业务场景、T级海量数据处理、千万级QPS、秒级实时处理亿级数据。