当前位置: 首页 > 科技观察

一篇揭秘阿里巴巴实时计算Blink核心技术:如何做到只快不破?

时间:2023-03-13 00:18:27 科技观察

实时计算在阿里巴巴1999年以来,阿里从电商平台不断拓展业务,衍生出电商平台等金融、支付、物流、娱乐等领域的众多产品基于淘宝和天猫、阿里妈妈广告平台、蚂蚁金服支付宝、阿里云、大文鱼等,今天的阿里不仅仅是一个电商平台,而是一个庞大的应用生态。阿里巴巴是目前全球最大的电商平台,2016财年营收5500亿。阿里平台有5亿用户,相当于中国人口的1/3,近1000万用户通过阿里平台进行交易。阿里平台天天。阿里已经成为一艘巨大的商业航母。在这艘航母上,大量的用户和应用必然会产生大量的数据。目前,阿里巴巴的数据量已经达到EB级,日增长率达到PB级。实时计算每天处理的峰值数据量可达每秒1亿条。今年的双11更是达到了惊人的每秒4.7亿。实时计算在阿里巴巴内部被广泛使用。随着新经济、技术创新和用户需求的出现和发展,人们越来越需要实时计算能力。它最大的优点是可以根据实时变化的数据更新大数据处理的状态和结果。下面举两个例子来说明实时计算在阿里巴巴内部的应用场景:1.双十一大屏每年双十一期间,阿里巴巴都会聚合有价值的数据呈现给媒体,GMV大屏就是其中之一他们。整个GMV大屏是一个非常典型的实时计算,每一笔交易数据都汇总显示在大屏上。从将一条数据写入DataBase,到将数据实时写入HBase,最后显示在大屏上,整个过程非常漫长。整个应用有很多挑战:1)大屏显示需要秒级延迟,需要亚秒级的实时计算延迟2)一次job需要聚合大量的双11数据3)Exactly-Once保持了数据计算的准确性4)系统高可用,不存在滞后和不可用。该应用场景的SLA要求很高,要求秒级时延和数据精度,但其计算并不复杂。接下来,我们将介绍更复杂的应用。2.实时机器学习机器学习一般有两个重要的组成部分:特征和模型。传统的机器学习采用批量计算的方式来收集特征和训练模型,更新频率太低,无法满足数据不断变化的应用需求。比如双11期间,商品价格和活动规则都和平时完全不一样,根据之前的数据训练也达不到最好的效果。因此,只有实时采集Features,训练Model,才能拟合出满意的结果。为此,我们开发了一个实时机器学习平台。这个实时机器学习平台主要包括两个部分:实时特征计算和实时模型计算。这个系统也有很多挑战,如下:1)机器学习需要收集各种指标,DataSources很多2)维度很多,比如用户维度,商品维度。维度甚至笛卡尔积的叠加,导致Metrics数量众多,State非常庞大。3)机器学习计算复杂,CPU消耗大。4)有些数据无法在State内存储,需要外存。有大量外部IO3实时A/B测试用户的query也可能不断变化。一个典型的例子是实时A/B测试。算法工程师在调优模型时会涉及到多个模型。不同的模型有不同的计算模式和方法,产生不同的计算结果。因此,不同的查询往往会订阅实时数据,在产生结果后,根据用户反馈对模型进行迭代,最终得到最终的模型。A/BTesting的挑战在于,算法工程师往往会计算很多指标,而所有指标都是实时计算统计的,这会浪费大量的资源。针对这一挑战,我们设计了A/BTesting框架开发平台。用于同步算法工程师感兴趣的Metrics,聚合、收集并发送给Druid引擎。这样,算法工程师根据不同作业的需求,将数据清洗到Druid,最后在Druid上对不同的指标进行统计分析,找到最佳的算法模型。总结起来,实时计算在阿里巴巴内部有以下挑战:1)业务量大,场景多,机器学习需求量大。这些因素共同导致了非常复杂的计算逻辑。机器规模非常大3)保证低延迟和数据准确性,同时满足高吞吐量的要求Flink的选择和优化为了应对上述挑战,我们调研了很多计算框架,最终选择Flink的原因如下:1.Flink很好地引入和设计了State,可以很好地描述基于State的复杂逻辑计算,比如join。并且可以以低延迟实现高吞吐量。但是Flink在状态、Chandy-Lamport算法等方面还是存在很多缺陷,为此阿里开发了一个项目叫Blink。Blink是开源Flink和AlibabaImprovement的结合。主要分为两部分:1.BlinkRuntime包括存储、调度、计算。不同的公司在使用Flink的时候,在存储、调度、底层优化等方面存在很多差异。阿里巴巴的Blink也对Runtime做了很多个性化的优化。这一层不容易与ApacheFlink社区统一。我们称之为BlinkRuntime。2.FlinkSQL原生Flink只有一个比较底层的DataStreamAPI。用户在使用时需要设计和实现大量的代码。此外,DataStream本身也存在设计缺陷。为了方便用户使用,阿里巴巴团队设计了面向流计算的FlinkSQL并回推社区。名称是FlinkSQL而不是BlinkSQL。主要原因是Blink和Flink在SQL用户API上与社区完全统一。另外ApacheFlink的大部分功能都是阿里巴巴贡献的,所以FlinkSQL就是BlinkSQL。非常大的区别。BlinkRuntime核心优化解密1.部署和模型优化优化包括以下几点:1)解决大规模部署问题。Flink中的一个Cluster只有一个JobMaster来管理所有的作业。随着Jobs的不断增加,单个Master无法承担更多的Jobs,造成了瓶颈。因此,我们重构了架构,让每个Job都有自己的Master。2)在Flink早期,TaskManager管理着很多任务,某个任务出现问题会导致TaskManager崩溃,进而影响其他作业。我们让每个Job都有自己的TaskManager,增强了Jobs的隔离性。3)引入ResourceManager。ResourceManager可以与JobMaster通信,实时动态调整资源,实现集群最优部署。4)我们不仅将这些优化应用于YarnCluster,还应用于Mesos和Standalone的部署。有了这些任务,Flink就可以应用于大规模的集群部署。2.IncrementalCheckpoint的实时计算需要保持checkpoint期间的计算状态。Flink早期的Checkpoint设计存在缺陷。当每个检查点发生时,它会读取所有旧状态数据,与新数据合并并全额写入磁盘。随着状态的不断增加,每个检查点所需的读写数据量是巨大的。因此Job的checkpoint间隔需要设置的很大,不能小于1分钟。Checkpoint间隔越大,故障转移时的回滚计算量越大,造成的数据延迟越严重。为了减少检查点间隔,我们提出增量检查点的设计。简而言之,检查点期间仅存储增量状态更改数据。由于历史中每个checkpoint的数据都已经保存了,所以后面的checkpoint只需要存储不同的数据,这样每次checkpoint需要更新的数据量就很小,这样checkpoint就可以在之内完成几秒钟。它大大减少了故障转移可能导致的延迟。3、异步IO很多时候我们要把数据存储在外部存储中,这样在计算过程中就需要通过网络IO来读取数据。传统方法采用Sync-IO读取方式。发送数据请求后,只有返回结果后才能开始下一次数据请求。这种方式造成了CPU资源的浪费,因为CPU在大多数情况下都是。等待网络IO请求返回。Sync-IO使得CPU的资源利用率无法最大化,极大地影响了单位CPU下的计算吞吐量。为了提高计算吞吐量,我们设计了Async-IO数据读取框架,允许异步多线程读取数据。每次数据请求发送完毕后,无需等待数据返回再继续发送下一个数据请求。当数据请求从外部存储返回时,计算系统将调用回调方法处理数据。如果数据计算不需要保序,数据返回后会快速计算后发送出去。如果用户需要保持数据计算的先后顺序,我们使用缓冲区暂时保存最先到达的数据,待前面的数据全部到达后再分批发送。使用Async-IO后,计算吞吐量可以根据设置的缓冲区大小提升几十倍甚至上百倍,大大提高了单位CPU利用率和整体计算性能。值得一提的是,上面提到的所有BlinkRuntime优化都已经贡献给了ApacheFlink社区。FlinkSQL核心功能解密1.阿里巴巴已经完成了ApacheFlinkSQL80%的研发。目前,ApacheFlinkSQL80%的功能是由阿里巴巴实时计算团队贡献的,包括200次提交,近10万行代码。之所以使用FlinkSQL,是因为我们发现底层的API给用户的迁移和上线带来了极大的不便。那么,我们为什么选择SQL?主要原因有以下几点:1)SQL是一种非常通用的描述性语言,SQL适合用户非常方便的描述Job需求。2)SQL有比较好的优化框架,让用户只需要关注业务逻辑的设计,不需要关心状态管理、性能优化等复杂的设计,大大降低了使用门槛.3)SQL通俗易懂,适用于不同领域的人。使用SQL的用户往往不需要太多的计算机编程基础,从产品设计到产品开发的各类人员都可以快速掌握SQL的使用。4)SQL的API非常稳定,在组织升级甚至更换计算引擎时无需修改用户的Job即可继续使用。5)部分应用场景需要流式更新和批量验证。SQL可以用来统一批计算和流计算的查询。实现一个Query,结果一样。2.流处理VS批处理如果要设计和批处理统一的流计算SQL,就必须了解流处理和批处理的区别。两者的核心区别在于流处理数据是无限的,批处理数据是有限的。这个本质上的区别引入了另外三个更具体的区别:1)流处理会不断地产生结果而不会结束,而批处理往往只返回一个最终结果就结束了。比如统计双11的交易金额,使用批量计算,开始计算所有买家消费的总金额,在双11当天所有交易结束后得到一个最终值。流处理需要跟踪实时交易金额,实时计算和更新结果。2)流计算需要做checkpoint和keepthestate,这样才能在failover时继续快速运行。然而,批处理计算通常不需要保持状态,因为它的输入数据通常是持久存储的。3)流数据会不断更新,比如某个买家的消费总额是不断变化的,而批数据是一天的消费总额,是固定不变的。流数据处理是对最终结果的超前观察,往往需要将超前计算的结果撤回(Retraction)来进行更改,而批计算则不需要。3、上面QueryConfiguration中提到的差异不涉及用户的业务逻辑,也就是说这些差异不会体现在SQL的差异上。我们认为这些差异只是工作的不同属性。为了描述流计算特有的一些属性,比如何时产生流计算结果,如何保存状态,我们设计了一个允许用户配置的QueryConfiguration,主要包括两部分:LatencySLA定义了从数据产生到数据的延迟演示,比如双11大屏在二层。用户可以根据自己的需求配置不同的SLA,我们的SQL系统会根据SLA的要求进行优化优化,从而在满足用户需求的同时达到最佳的系统性能。StateRetention/TTL流计算永不停止,但流数据中的State往往不需要长期保留。如果保存时间过长,会浪费存储空间,并且会极大地影响性能。所以我们允许用户设置合理的TTL(过期时间)以获得更好的计算性能。我们通过查询配置描述了流和批之间的一些不同属性。接下来,我们需要继续考虑如何设计流式SQL?4.动态表(Dynamic-Table)问题的关键在于SQL在批处理时对表进行操作,流式数据中没有表。因此,我们创建了数据随时间变化的动态表。动态表是流的另一种表现形式,它们具有二元性,即可以在不破坏数据一致性的情况下相互转换。下面是一个例子:如图所示,输入流在左边。我们为每条数据生成一个Dynamic-Table,然后使用Changelog发送Table的变化。经过这样两次改动,输入流和输出流中的数据始终保持一致,证明了Dynamic-Table的引入并没有丢失语义和数据。有了动态表的概念,我们就可以将传统的SQL应用到流中。值得一提的是,Dynamic-Table是虚拟的存在,不需要实际存储来实现。我们再看一个例子:如图,我们在有输入流的情况下进行连续查询。我们将Stream理解为Dynamic-Table。动态查询根据Dynamic-Table生成一个新的Dynamic-Table。如果需要新生成的Dynamic-Table,可以不断生成流。这里由于加入了连续查询的聚合计算,左右两边的流都进行了改造。简而言之,动态表的引入为我们提供了对流进行连续SQL查询的能力。5.StreamSQL没有必要存在通过上面的讨论,我们发现有了Dynamic-Table,我们不需要创建任何新的streamingSQL语义。所以我们得出结论,流式SQL是不必要的。ANSISQL可以完整描述StreamSQL的语义。保持ANSISQL的标准语义是我们构建FlinkSQL的一个基本原则。6、ANSISQL函数实现基于以上理论基础,我们实现了流计算所需的几个ANSISQL函数,包括:DML、DDL、UDF/UDTF/UDAF、connectionJoin、Retraction、Windowaggregation等,另外针对这些功能,我们也做了很多的查询优化,从而保证FlinkSQL能够满足用户的各种查询需求,同时拥有优秀的查询性能。接下来简单介绍一下其中的几个:1)JOIN流和动态表具有双重性,一条SQL看似是Table的join,实际上是流的join。比如InnerJoin的实现原理是这样的:数据会从输入流的两边传来,一边的数据会先存入State,然后根据查询到另一边的State加入键。如果存在则输出结果,如果不存在则不输出,直到相反的数据到来才产生结果。简而言之,这两个流有两种状态。一侧的数据到达后,保存起来等待另一侧的数据。所有数据到达后,内部连接生成结果。除了两个流的连接,我们还介绍了流和外部表的连接。我们的机器学习平台在HBase中存储了大量的数据,在HBase中查询数据的操作其实就是连接了一个外部表。连接外部表通常有两种模式:a)查找模式。当流式数据到达时,立即查询外部表得到结果。b)快照模式。当流式数据到达时,将快照的版本信息实时发送到外部存储服务进行数据查询,外部表存储根据版本信息返回结果。值得一提的是,我们设计的流关联外部表的函数没有引入任何新的语法,完全按照SQL-2011标准实现。同样的查询也适用于批量计算。2)撤回是流计算的一个重要概念。我举个例子来解释一下:计算词频计算词频是指统计所有英文单词出现的频率,最后根据频率统计不同频率的不同单词的个数。比如统计的初始状态只有三个词,HelloWorldBark,每个词只出现一次,那么词频的最终结果就是出现频率为1的词有3个(没有出现频率为其他时间的词),因此结果表只有一行“1——3”。当这个词不断更新,又增加了一个Hello,因为Hello的出现频率变成了2次,我们在词频结果表中插入一行新的数据,比如“2——1”。很明显,出现两次的词是一个,那么“2——1”的结果是正确的,但是出现频率为1的词的个数已经错了,应该是2而不是3。本质原因这类问题是流计算的输出结果是对计算的预先观察。由于数据不断更新,计算结果难免会发生变化,这就需要我们收回之前的结果。然后发送更新后的结果,否则数据结果不会错。对于上面的例子,当Hello的频率从1变为2时,我们不仅需要在结果表中插入行“2--1”,还需要对行“1-”进行withdraw更新操作-3”。值得一提的是,什么时候需要撤回,什么时候不需要,完全由SQL查询优化器来判断。用户根本不需要感知它。用户只需要通过SQL来描述自己的业务计算逻辑即可。如图,第一种场景不需要退出,第二种场景需要退出,完全由优化框架决定,而不是用户决定。这一点极大地体现了使用SQL和利用SQL中天然优化框架的好处。3)Window聚合Window聚合是FlinkSQL的一个重要能力。在图中的示例中,我们汇总了每个小时的数据。除了这个Tumble窗口,我们还支持SlidingWindow和SessionWindow。未来还将支持用户自定义窗口。4)查询优化查询优化除了增加新的功能外,我们还做了很多查询优化。例如微批处理。如果没有微批处理,处理每条数据都会伴随多次IO读写。通过微批处理,我们可以通过少量的IO处理来处理数千条数据。另外我们也做了很多filter/join/aggregatepushdown和TopN的优化。下面举例说明TopN优化:如上图所示,我们要获取销量前三名的城市,我们有两个用户查询。一种底层实现方式:a)一种方式是在没有数据到来时,对所有保存的城市进行排序,然后截取前三个城市。这种设计会在每次数据更新时重新排列所有的城市,这必然会造成计算资源的大量浪费。b)我们的查询优化器将自动识别查询语句并优化此计算。在实际执行过程中,我们只需要不断更新前三名的城市,大大优化了计算的复杂度,提高了性能阿里巴巴的实时计算应用是基于流计算SQL的,我们开发了两个计算平台。1、阿里云流计算开发平台之一是阿里云流计算平台(streamCompute),允许用户在平台内编写SQL和调试。调试无误后,用户可以通过该平台直接发布作业部署到阿里云集群上。部署完成后运维检查上线。因此,该平台整合了所有实时计算需求,集开发、调试、在线部署、运维于一体,大大加快了用户开发上线的效率。值得一提的是,2017年双11期间,阿里巴巴集团的大部分实时计算作业都是通过该平台发布的。从今年9月份开始,我们通过阿里云对外开放了这个平台,包括公有云和私有云,让他们可以使用阿里巴巴的实时计算能力。2.阿里巴巴实时机器学习平台Porsche为了方便算法同学开发机器学习任务,我们设计并实现了一个在线机器学习平台-Porsche,面向算法人员,支持可视化自助开发和基于FlinkSQL和Hbase的操作。如上图所示,用户将组件以可视化方式拖拽到保时捷平台IDE中的画布中,配置组件属性,定义一个完整的计算DAG。这个DAG会被翻译成SQL,最后提交给Blink执行。此外,值得一提的是,保时捷平台还支持Tensorflow,今年的双11也大放异彩。本平台为算法同学节省了学习和使用SQL的成本,暂时只对内部用户开放。双十一实时计算总结上图展示了阿里巴巴的实时计算架构。底层是几千台物理机,上面是统一部署的ResourceManagement和Storage,然后是BlinkRuntime和FlinkSQL。用户通过StreamCompute和Porsche平台提交作业,目前已支持阿里内部数百名工程师和近千个FlinkSQL作业。以上就是阿里巴巴实时计算的现状。借助实时计算,阿里巴巴双十一取得了1682亿的亮眼成绩。实时计算的贡献主要体现在以下几点:1、这次双11是互联网历史上最大的并发事件,每秒有几十万笔交易。交易和支付的实时聚合统计操作都是由Blink计算带来的。2.3分01秒展现100亿条数据,不仅对DataBase的吞吐量要求很高,也非常考验实时计算的速度。3.算法平台帮助算法同学取得良好的搜索和推荐效果,提升整体GMV。总之,实时计算不仅满足了阿里巴巴内部的各种需求,也提升了GMV。我们希望通过阿里云实时计算平台(StreamCompute),将Blink的实时计算能力输出到阿里以外的所有企业,让他们从中受益。Dasha,阿里巴巴资深技术专家,负责FlinkSQL的实时计算。此前曾在美国Facebook工作,担任ApacheFlinkcommitter。【本文为专栏作者《阿里巴巴官方技术》原创稿件,转载请联系原作者】点此查看作者更多好文