当前位置: 首页 > 科技观察

网易游戏FlinkSQL平台实践

时间:2023-03-21 19:43:23 科技观察

摘要:本文整理自网易游戏高级开发工程师林晓波在FlinkForwardAsia2021平台建设大会上的演讲。主要内容包括:网易游戏FlinkSQL开发历史StreamflySQLv1基于templatejar和基于SQLGateway的StreamflySQLv2futurework01网易游戏FlinkSQL开发历史网易游戏实时计算平台叫做Streamfly,以电影命名《驯龙高手》暴风蝇。由于我们已经从Storm迁移到Flink,所以我们用更通用的Stream替换了Stormfly中的Storm。Streamfly的前身是离线运行平台Omega下的一个名为Lambda的子系统,负责所有实时作业的调度。一开始支持Storm和SparkStreaming,后来改为只支持Flink。2019年,我们将Lambda分离出来,并基于它构建了Streamfly计算平台。随后,我们在2019年底开发并上线了第一版FlinkSQL平台StreamflySQL,该版本基于模板jar提供了基本的FlinkSQL功能,但用户体验有待提升,因此我们重新构建了第二版StreamflySQL2021年初从零开始,第二个版本基于SQLGateway。要了解这两个版本的区别,我们需要回顾一下FlinkSQL的基本工作流程。用户提交的SQL首先会被Parser解析成逻辑执行计划;逻辑执行计划将通过PlannerOptimizer进行优化,生成物理执行计划;物理执行计划将由PlannerCodeGen代码生成,并转化为通用的DataStreamAPITransformation;最后,StreamGraphGenerator将这些Transformations转换为Flink作业的最终表示JobGraph,并提交给Flink集群。以上一系列过程都发生在TableEnvironment中。根据部署方式的不同,TableEnvironment可能运行在FlinkClient或JobManager中。Flink现在支持3种集群部署模式,包括Application、Per-Job和Session模式。在Application模式下,TableEnvironment会运行在JobManager端,而在其他两种模式下,TableEnvironment会运行在Client端。但是这三种模式有一个共同的特点,TableEnvironment是一次性的,提交JobGraph后会自动退出。为了更好的复用TableEnvironment来提高效率和提供有状态的操作,一些项目会把TableEnvironment放到一个新的独立的Server进程中运行,从而产生一个新的架构,我们称之为ServerSide-by-sideSQLcompilation。相比之下,还有客户端SQL编译。可能有同学会问为什么JobManager端没有SQL编译。这是因为JobManager是一个比较封闭的组件,不适合扩展,即使做了,达到的效果也和客户端编译基本一样。所以一般来说,常见的FlinkSQL平台架构一般有两种,Client和Server。Client-sideSQL编译,顾名思义就是在客户端进行SQL的解析、翻译、优化(这里的client是广义的client,不一定是Flinkclient)。典型的案例就是常见的模板jar和Flink的SQLClient。这种架构的优点是开箱即用,开发成本低,使用Flink公共API,版本升级相对容易;缺点是难以支持高级功能,每次都要启动一个比较重的TableEnvironment。所以性能比较差。然后是服务器端的SQL编辑。这种架构将SQL的解析、翻译、优化逻辑放到一个独立的服务器进程中,使客户端非常轻量,更接近于传统数据库的架构。一个典型的例子是Ververica的SQLGateway。这种架构的优点是扩展性好,可以支持很多自定义功能,性能好;缺点是开源界没有成熟的解决方案。上面提到,SQLGateway只是一个比较早的原型系统,缺乏很多企业级的特性,如果在生产环境中使用,需要进行一定程度的修改,而且这些修改涉及到更多的Flink内部API,需要更多的背景知识Flink的,总体来说开发成本比较高,后续版本升级的工作量也比较大。回到我们的FlinkSQL平台,我们的StreamflySQLv1是基于客户端SQL编译,而v2是基于服务端SQL编译。让我一一介绍。02基于模板jar的StreamflySQLv1StreamflySQLv1选择客户端SQL编译的原因主要有三个:第一是平台整合。与许多公司的作业调度器是用大数据中更主流的Java编写的不同,我们的Lambda调度器是用Go开发的。这是因为Lambda在设计之初就支持多种实时计算框架。出于松耦合和公司技术栈考虑,Lambda采用Go作为开发语言,采用类似YARN的方式动态生成shell脚本调用不同的框架。命令行界面。这种松散耦合的接口方式给我们带来了极大的灵活性。比如我们可以很方便的支持Flink的多个版本,而不用强制用户随系统版本升级,但同时也造成无法直接调用Flink原生的Java。蜜蜂。第二个原因是松耦合。开发时,Flink版本为1.9。当时ClientAPI比较复杂,不适合平台集成。当时社区也在推动Client的重构,所以我们尽量避免依赖ClientAPI来开发FlinkSQL平台。第三个原因是实践经验。因为模板jar+配置中心模式在网易游戏中已经被广泛使用,我们在这方面积累了很多实践经验。综上所述,我们很自然地采用了模板jar+配置中心的架构来实现v1版本。上图是v1版本的整体架构。在Lambda作业平台的基础上,我们增加了StreamflySQL后端作为配置中心,负责根据用户提交的SQL和作业运行配置加上通用模板jar生成Lambda作业。整体作业提交流程如下:用户在前端SQL编辑器中提交SQL和运行配置。StreamflySQL后端在收到请求后生成一个Lambda作业并传递配置ID。然后Lambda启动作业,后面是执行FlinkCLIrun命令提交作业。.FlinkCLIrun命令会启动FlinkClient加载并执行模板jar的main函数。这个时候会读取SQL和配置,初始化TableEnvironment。TableEnvironment将从目录中读取必要的元信息,例如数据库/表。顺便说一下,我们在网易游戏中并没有使用统一的目录来维护不同组件的元数据,而是不同的组件有自己的元数据中心对应不同的目录。最后,TableEnvironment编译JobGraph并以Per-JobCluster的方式部署作业。StreamflySQLv1实现了FlinkSQL平台从零到一的搭建,满足了部分业务需求,但痛点仍然很多。第一个痛点是响应慢。在比较典型的SQL中,以模板jar的形式启动作业,需要准备TableEnviroment,可能需要5秒,然后执行SQL编译优化,包括与Catalog交互获取元数据,也可能需要5秒;编译jobgraph后,需要准备per-jobcluster,一般需要20秒以上;最后还需要等待Flink作业的调度,也就是作业的状态从scheduled变为running,也可能需要10秒。一般情况下,v1版本启动一个FlinkSQL作业至少需要40秒,比较长。但是仔细分析这几个步骤,只有SQL编译优化和作业调度是不可避免的。其他的比如TableEnvironment、Flinkcluster其实都可以提前准备好。这里的慢是资源懒初始化,几乎没有复用。第二个痛点是调试难。我们对SQL调试的要求是:第一点,调试出来的SQL要和线上的SQL基本一致。第二点,调试SQL不能影响线上数据。它可以读取联机数据,但不能写入。第三点,因为调试SQL通常只需要提取少量的数据样本来验证SQL的正确性,所以我们要限制调试SQL的资源,一方面是出于成本的考虑,另一方面另一方面,为了防止调试SQL与在线作业竞争资源。第四,因为调试SQL处理的数据量比较小,所以希望能够更快更方便的得到结果。在v1版本中,针对上述需求,我们设计了以下解决方案:首先,对于被调试的SQL,系统在SQL翻译时,将原来的Sink替换为专用的PrintSink,解决了需求中的前两点。然后对PrintSink进行限流,通过Flink的反压机制实现整体限流,限制job的最大执行时间。超时后系统会自动结束作业,解决了需求中的资源限制。最后,为了响应更快,调试好的作业不会提交到YARN集群运行,而是在Lamdba服务器本地执行开一个MiniCluster,也方便我们从中提取PrintSink结果标准输出。这就解决了需求中的最后一点。调试模式的结构如上图所示。与一般的SQL提交流程相比,主要区别在于作业不会提交到YARN,而是在Lambda服务器本地执行,节省了准备Flink集群的开销,更容易控制资源和获取结果.以上调试方案基本都具备了,但是在实际使用过程中还是存在很多问题。首先,如果用户提交的SQL比较复杂,SQL的编译优化可能需要很长时间,容易导致作业超时,并可能在结果输出前被系统终止。同时这样的SQL也会对服务器造成很大的压力。其次,该架构无法调试具有长时间窗口的作业或需要引导状态的作业。第三,由于执行结果是在作业结束后批量返回的,而不是在作业执行过程中流式返回,用户需要等到作业结束——通常需要10多分钟才能看到结果。四、更换SQL翻译阶段调试SQL的Sink。这个功能是通过改造Flink的Planner来实现的,相当于将业务逻辑侵入到Planner中,不够优雅。第三个痛点是v1版本只允许单一的DML。与传统数据库相比,我们支持的SQL语句非常有限。比如MySQL的SQL,可以分为DML、DQL、DDL、DCL。DML用于操作数据,常见的语句包括INSERT/UPDATE/DELETE。StreamflySQLv1只支持INSERT,与FlinkSQL一致。FlinkSQL使用retract方式——也就是类似Changelog的方式来表示UPDATE/DELETE,所以它只支持INSERT,其实是没有问题的。DQL用于查询数据,常用语句为SELECT。这在FlinkSQL中是支持的,但是StreamflySQLv1不支持DQL,因为缺少Sink无法生成有意义的Flink作业。DDL用于定义元数据,常见的语句有CREATE/ALTER/DROP等。StreamflySQLv1版本不支持这个,因为从模板jar调用SQL的入口是sqlUpdate,不支持纯元数据操作,以及为纯元数据操作启动一个TableEnvironment是完全不经济的。最后,DCL用于管理数据权限,例如GRANT和REVOKE语句。这个FlinkSQL是不支持的,因为Flink目前只是一个datauser而不是manager,DCL没有意义。综上所述,v1版本只支持单个DML,这让我们漂亮的SQL编辑器变得空洞。基于以上痛点,我们在今年研发了StreamflySQLv2。v2使用服务器端SQL编译架构。03SQLGateway-basedStreamflySQLv2我们的核心需求是解决v1版本的几个痛点,包括提升用户体验,提供更完善的SQL支持。总体思路是在服务器端采用SQL编译的架构,提高可扩展性和性能。另外,我们的集群部署方式也改为SessionCluster,提前准备好集群资源,节省启动YARN应用的时间。这里有两个关键问题。首先,我们是要完全自己开发还是基于开源项目?在调研过程中,我们发现Ververica的SQLGateway项目非常适合我们的需求。易于扩展,是Flink社区中FLIP-91SQLGateway的一个基础实现。也便于与社区未来的发展方向融合。第二个问题是SQLGateway本身有提交作业的能力。这与我们现有的Lambda平台不谋而合,在认证授权、资源管理、监控告警等方面会造成重复建设、难以统一管理等问题。有两个入口。那么两者应该如何划分呢?我们最终的解决方案是使用SessionCluster的两阶段调度,即资源初始化和作业执行分离,这样可以让Lambda负责SessionCluster的管理,StreamflySQL负责SQL作业的管理,这样我们就可以重用Lambda的大部分基础能力。这是StreamflySQLv2的架构图。我们将SQLGateway嵌入到SpringBoot应用程序中并开发了一个新的后端。整体看起来比v1版本复杂,因为原来的一级调度变成了session和job的二级调度。首先,用户需要创建一个SQL会话,StreamflySQL后端会生成一个会话作业。从Lambda的角度来看,sessionjob是一种特殊的作业,它使用yarn-session脚本来启动一个FlinkSessionCluster。SessionCluster初始化完成后,用户就可以在session内提交SQL了。StreamflySQL后端会为每个会话开启一个TableEnvironment,负责执行SQL语句。如果是只涉及元数据的SQL,则直接调用Catalog接口完成。如果是job类型的SQL,会编译成JobGraph提交给SessionCluster执行。v2版本很大程度上解决了v1版本的几个痛点:在响应时间上,v1往往需要1分钟左右,而v2版本通常在10秒内完成。在调试和预览方面,v2不需要等待作业结束,而是在作业运行时通过socket流返回结果。这要归功于SQL网关的巧妙设计。对于select语句,SQLGateway会自动注册一个基于socket的临时表,并将select结果写入该表。在SQL支持方面,v1只支持DML,而v2可以借助SQLGateway支持DML/DQL/DDL。不过,虽然SQLGateway有很好的核心功能,但我们使用起来并不是一帆风顺,也遇到了一些挑战。首先也是最重要的是元数据的持久性。SQLGateway本身的元数据只保存在内存中。如果进程重启或者遇到异常崩溃,元数据就会丢失,这在企业生产环境中是不可接受的。因此,我们在将SQLGateway集成到SpringBoot程序中之后,很自然的将元数据保存到数据库中。Metadata主要是session的元数据,包括session的Catalog、Function、Table、jobs。这些元数据按应用范围可分为4层。最下面两层是全局配置,以配置文件的形式存在;上面两层是运行时动态生成的元数据,存储在数据库中。上层的配置项具有更高的优先级,可以用来覆盖下层的配置。我们从下往上看这些元数据:最底层是全局默认的FlinkConfiguration,也就是我们在FlinkHome下的flink-confyaml配置。上层是Gateway本身的配置,比如部署方式(比如YARN或者K8S),比如默认发布的Catalog和Function等等。第三层是session级别的SessionConfiguraion,比如session对应的SessionCluster的clusterID或者TaskManager的资源配置等。最上层是job级别的配置,包括由session动态生成的metadata作业,例如作业ID、用户设置的检查点周期等。这样灵活的设计不仅解决了元数据持久化的问题,也为我们的多租户特性打下了基础。第二个挑战是多租户。多租户分为资源和认证。在资源方面,StreamflySQL利用Lambda作业平台,在不同的队列中启动SessionClusters。它们的Master节点和资源是天然隔离的,所以不存在像SparkThriftServer那样的不同用户。共享一个Master节点和混合资源的问题。在鉴权方面,由于SessionCluster属于不同的用户,StreamflySQL后端需要实现多租户伪装。在网易游戏中,组件一般使用Kerberos认证。我们实现多租户的方式是使用Hadoop的ProxyUser,先以超级用户身份登录,然后伪装成项目用户,从不同的组件中获取delegationtoken。这里的组件主要是HiveMetaStore和HDFS,最后将这些token保存到UGIInside,使用doAS提交作业。第三个挑战是横向扩张。StreamflySQL为了实现高可用和扩展服务能力,自然需要采用多实例架构部署。因为我们已经在数据库中存储了主要的状态元数据,所以我们可以随时从数据库中构建一个新的TableEnvironment,所以StreamflySQL实例就像一个普通的web服务一样轻量级,并且可以很容易地扩展和缩减。但是并不是所有的状态都能持久化,有些状态是故意不持久化的。例如,用户使用SET命令改变TableEnvironment的属性,比如启用TableHints,这些是临时属性,在重建TableEnvironment后会被重置。这符合预期。再比如,当用户提交一个select查询进行调试和预览时,TaskManager会与StreamflySQL后端建立socket连接,而socket连接显然不是持久化的。因此,我们在StreamflySQL的多个实例之前加入了affinity负载均衡,根据SessionID进行流量调度,这样在正常情况下,同一个用户的请求都落在同一个实例上,保证了用户体验的连续性。第四个挑战是工作状态管理。其实这里的status这个词是一个双关语,有两个意思:第一个意思是作业的运行状态。SQL网关目前只提交SQL,不监控后续运行状态。因此,StreamflySQL设置了一个监控线程池,定期轮询和更新作业状态。因为StreamflySQL是多实例的,如果它们的监控线程同时操作同一个job,可能会出现丢失更新的问题,所以我们这里使用CAS乐观锁来保证过时的更新不会生效。那么我们就会在作业异常退出或者状态获取不到的时候发出告警。例如,在JobManager故障转移的情况下,我们无法知道Flink作业的状态。此时系统会发出断开连接异常状态告警。第二个意思是Flink的持久化状态,即FlinkState。原生SQL网关不管理Flink的savepoint和checkpoint,所以我们添加了stop和stop-with-savepoint功能,并强制启用retainedcheckpoint。这使得系统在作业遇到异常终止或简单停止时自动找到最新的检查点。在这里我可以分享我们的算法。其实Lambda也提供了自动查找最新checkpoint的功能,不过Lambda假设jobs是Per-JobCluster,所以只需要在集群的checkpoint目录下查找最新的checkpoint即可。但是这样的算法并不适用于StreamflySQL,因为SessionCluster有多个作业,最新的checkpoint不一定是我们的目标作业。因此,我们改用类似JobManagerHA的搜索方式,先读取job归档目录的元数据,从中提取最新的checkpoint。04Futurework未来首先要解决的问题之一就是State迁移的问题,即用户更改SQL后如何从原来的Savepoint恢复。目前只能通过改变类型来告知用户风险。比如增减字段不会造成Savepoint不兼容,但如果加了join表,影响就不好说了。因此,未来我们计划对SQL变更前后的执行计划进行分析,提前告知用户变更前后的状态兼容性。第二个问题是细粒度的资源管理。目前,我们无法在作业编译时指定SQL资源。比如TaskManager的CPU和内存是在SessionCluster启动后确定的,是会话级别的。目前只能通过作业并行来调整资源,不够灵活,容易造成浪费。现在Flink1.14已经支持DataStreamAPI的细粒度资源管理,可以在算子层面设置资源,但是SQLAPI暂时还没有计划,未来我们可能会参与,推动相关提案的进展。最后,社区贡献。我们有一些使用SQLGateway的经验,并对其进行了很多改进。我们希望这些改进能够回馈Flink社区,推动FLIP-91SQLGateway的进步。