当前位置: 首页 > 科技观察

ApacheFlink漫谈系列——PyFlink核心技术揭秘

时间:2023-03-14 11:50:55 科技观察

大家好,很高兴在今天的峰会上与大家分享ApachePyFlink的核心技术。首先,让我简单介绍一下自己。我是孙金城,外号金柱,来自阿里巴巴。2016年开始投入开源建设,目前是ApacheFlink、ApacheBeamCommitter和ApacheIoTDB的PMC成员。也是Apache软件基金会的成员,ApacheMember。我也喜欢写一些技术博客和录制视频课程。欢迎大家关注我的公众号。今天我们有4个部分要分享。首先快速了解PyFlink的使命和愿景,然后重点介绍PyFlink的核心技术,最后介绍PyFlink的未来规划和现有应用案例。那么让我们开始今天的第一部分,PyFlink的使命愿景。首先,ApacheFlink是一个有状态的分布式流计算框架。可以作用于有限和无限的数据集。那么,对于有限数据集和无限数据集的流计算处理,业界有两种典型的架构。一种是Micro-Batching模式,即将流看成是批处理的特例。然后是ApacheFlink的架构模式,纯流的架构模式,把batch看成是流的一个特例。纯流的设计,最大限度的增加了计算的延迟。那么Flink的分布式亚秒级延迟能力是如何暴露给用户的呢?Flink提供了SQL、DataStream和ProcessFunction多层API供用户选择,可惜只能提供给Java用户使用。那么,扩展Flink的能力,面向更多的用户群体,是非常有意义的,那么Flink如何支持多语言呢?将添加哪种语言支持?我们在开始做PyFlink之前做了一些调研,发现Python语言在2020年比Java语言更活跃,而且是持续上升的趋势。那么接下来我们就来详细了解一下为什么Python语言会受到如此多的关注,以及大家使用Python语言来完成什么样的工作呢?伴随着这些问题,随着AI的兴起,Python不仅在数据分析和Web开发领域得到广泛应用,在AI/机器学习领域也得到广泛应用。其中一件比较有趣的事情是,就连高速公路巡警的爱好也变成了Python编程,8/9岁的孩子们都在用Python做有趣的游戏。这足以见证Python的流行。因此,Python被认为是Flink多语言支持最重要的开发语言。Python很流行,Python的生态发展也很成熟,但是这里有一个典型的问题,就是这些生态库大多是单机模式。在当今大数据时代,Python生态面临的一个典型问题是:如何为海量数据的处理提供分布式能力?因此,面对Flink能够面向更多的用户群体,而Python是最流行的语言,Python成为了Flink多语言支持的第一语言。同时,面对Python语言缺乏分布式能力的问题,PyFlink的使命是为Python生态系统配备分布式处理能力。因此,Pyflink的使命就是将Flink的能力输出给Python用户,让Python生态具备分布式能力。好了,我们来看看PyFlink是如何完成使命的,有哪些核心技术细节。首先,Flink对Python用户的导出能力的核心问题显然是PythonVM和JavaVM之间的握手。它们之间必须建立通信,这是PyFlink要解决的首要问题。面对PVM和JVM的通信问题,我们选择了Py4J,在PythonVM中启动了一个Gateway,在JavaVM中启动了一个GatewayServer来接受Python请求,并在PythonAPI中提供了与JavaAPI相同的对象,如TableENV、Table、ETC。这样,Python在编写PythonAPI时,本质上就是在调用JavaAPI。同时,还有一个工作部署的问题。我们可以使用Python命令、Pythonshell和CLI来提交作业。那么Py4J与JVM交互的原理是什么?其实其核心机制就是Python端每创建一个对象,Java端就会创建一个对应的Java对象,并生成一个对象ID。Java端使用Map来保存对象ID和对象。同时将对象ID返回给Python端,Python端根据对象ID和方法参数进行操作,本质上就是在操作Java对象。那么基于这种架构有哪些优势呢?第一个是简单性,确保PythonAPI语义和JavaAPI的一致性。第二点,Python作业可以达到和Java一样的极致性能。在刚刚结束的阿里双11狂欢节,40亿处理能力的峰值。OK,完成了现有Flink功能对Python用户的输出,下面我们继续讨论如何将Python生态功能引入Flink,然后分发Python功能。如何?结合现有FlinkTableAPI的现状和现有Python类库的特点,我们可以将现有的所有Python类库函数都视为用户自定义函数(UDF),并集成到Flink中。这样,我们发现,将Python生态融入Flink的手段就是将其作为一个UDF来对待,那么整合的核心问题是什么?没错,就是PythonUDF的执行问题。那么,我们如何处理这个核心问题呢?解决PythonUDF执行的问题不仅仅是VM之间的通信问题,它涉及到Python执行环境的管理、Java和Python之间业务数据的分析、FlinkStateBackend能力向Python的输出,以及对Python的监控。PythonUDF执行等等,这是一个非常复杂的问题。面对如此复杂的问题,我们选择了统一的编程模型ApacheBeam。为了解决多语言、多引擎支持的问题,Beam高度抽象了一个框架,叫做PortabilityFramework。如下图所示,Beam目前支持Java/Go/Python等语言,图中下方的BeamFuRunners和Execution的关系解决了引擎和UDF执行环境的问题。其核心是使用Protobuf抽象数据结构,使用gRPC协议进行通信,封装核心gRPC服务。所以此时Beam更像是一只萤火虫,为PyFlink解决UDF执行问题照亮了道路。我们来看看Beam提供了哪些gRPC服务。如图所示,Runner部分是Java算子执行,SDKWorker部分是Python执行环境,Beam抽象出了Control/Data/State/Logging等服务。并且这些服务已经在Beam的Flinkrunner上稳定高效运行了很长时间。所以在PyFlinkUDF执行上我们可以站在巨人的肩膀上:),这里我们发现ApacheBeam在API层面和UDF执行层面都有解决方案,而PyFlink在API层面使用Py4J解决了VM通信问题。在UDF执行需求方面,使用Beam的ProtabilityFramework来解决UDF执行环境问题。这也说明PyFlink在技术选择上严格遵循以最小成本实现既定目标的原则,永远会选择最适合PyFlink长远发展的技术架构。好了,那么现在我们来回答,Flink是如何支持多语言的呢?在API层面,其他语言需要处理algin现有的Java语言API。在语言执行环境的问题上,Flink可以复用Beam提供的基础设施。换句话说,我们可以很容易地复用Flinkrunner和fnapi层面的基础服务和数据结构。这将使Flink可以轻松支持多种语言。下面的内容我们一起来看看PyFlink的UDF架构设计。K,让我们看一下PyFlinkUDF的整体架构。在UDF架构中,我们不仅要考虑JavaVM和PythonVM之间的通信,还要考虑编译阶段和运行阶段的不同需求。在图中,我们用绿色表示JavaVM的行为,用蓝色表示PythonVM的行为。首先我们来看编译阶段,也就是局部设计。本地设计是纯API映射调用。我们仍然需要使用Py4J来解决通信问题。即如图所示,Python每执行一次API,都会同步调用Java对应的API。在UDF支持方面,需要添加UDF注册API,register_function,但是注册是不够的。用户在自定义PythonUDF时往往会依赖一些第三方库,所以我们还需要添加一个添加依赖的方法,即一系列的add方法,比如add_Python_file()。在编写Python作业的同时,JavaAPI也会被同时调用。在提交作业之前,Java端会构建.JobGraph。然后通过CLI等多种方式将作业提交到集群运行。下面我们来看看Python和Java在运行时的不同分工。首先,Java端与普通的Java作业相同。JobMaster将作业分配给TaskManager,TaskManager将Task一个一个执行。任务涉及Java和Python运算符的执行。.在PythonUDF的operator中,我们会设计各种gRPC服务来完成JavaVM和PythonVM之间的各种通信,比如DataService完成业务数据通信,StateService完成PythonUDF对JavaStatebackend的调用,当然还有Logging和指标等其他服务。这些服务都是基于Beam的FnAPI构建的,最终在PythonWorker中运行用户的UDF,运行完成后再使用相应的gRPC服务将结果返回给Java端的PythonUDF算子。当然,Pythonworker不仅是Process模式,还有Docker模式,甚至是External服务集群。这种扩展机制为PyFlink与Python生态中的其他框架的集成奠定了坚实的基础。这里最重要的是如何使用beam的基础设施来执行PythonUDF。让我们看一下pyflink如何与Beam的可移植性框架集成以执行PythonUDF。场景的场景是对输入数据进行一系列的转换,并将结果写入另一个外部存储系统。我们知道Flink是用Java开发的,但是用户自定义的转换逻辑是用Python开发的。如示例所示,假设ParDo使用PythonUDF,在Beam中引入了一个ExecutableStage,其中包含了用户自定义Python函数的所有必要信息,例如:输入/输出数据类型、用户自定义函数的payload、用户-definedfunctions使用的状态和定时器等。同时,Beam还提供了一个Java库,可以用来管理特定语言的执行环境。“forStage()”会根据SDKharness部分ExecutableStage中定义的信息生成执行用户自定义函数所需的流程,从而建立runner和SDKHarness之间的通信连接。Beam的SDKharness支持多种功能的执行,如ParDo、Flatten等;不同的函数有不同的执行方式,所以SDKharness定义了一个特定的操作类来执行它。但是如何才能明确定义beam中各个函数的执行逻辑呢?Beam提供了非常灵活的插件机制,就是为每一类函数定义一个URN,比如Input/output/parDo等,这样的插件机制也为Flink集成Beam框架提供了一种便捷的方式.那么在PyFlink中使用PythonSDKHarness的工作原理如下:在启动阶段,PythonSDKHarness会为所有内置操作建立URN和操作映射。在处理新包的初始化阶段,runner会将URN与函数一起发送到SDKHarness。SDKHarness可以根据给定的URN构造相应的操作。该动作然后用于执行输入的数据和相应的用户定义的功能逻辑。我们看到如图所示,我们定义了各种URN,包括input/output,coder等等。OK,那么注册URN也很简单,就是我们添加了一些创建自定义动作和Coders的函数。这些函数使用Beam的pythonsdk工具包中定义的装饰器进行装饰。装饰器包含两个参数:URN和一个基于protobuf的自定义参数。OK,在支持PythonUDF之后,我们也将Pandas与PyFlink进行了集成。我们可以很方便的在PyFlink中定义PandasUDF。同时,我们还提供了frompandas和topandasAPI,支持Flink和Pandas之间的操作转换。同时,我们不断优化udf的执行性能。1.11版本的性能是1.10版本的30倍。OK,让我们快速了解一下PyFlink未来的规划。PyFlink的开发必须始终由心驱动。我们应该专注于将现有的Flink功能输出给Python用户,并将Python生态功能集成到Flink中。首先解决PythonVM和JavaVM的通信问题,然后将已有的TableAPI功能暴露给Python用户,提供PythonTableAPI。这是Flink1.9所做的工作。接下来我们将Python函数集成到Flink的准备工作是集成ApacheBeam,提供PythonUDF的执行环境,增加Python对其他类库依赖的管理功能,为用户提供User-defined-Funciton接口定义,支持Python用户定义函数。这就是Flink1.10所做的。工作。为了进一步扩展Python生态的分布式功能,PyFlink将提供对PandasSeries和DataFrame的支持,即用户可以直接在PyFlink中使用PandasUDF。同时,为了增强用户的易用性,让用户有更多的方式使用PyFlink,后期会增加PythonUDF在SqlClient中的使用。针对Python用户的机器学习问题,添加Python的MLpipelineAPI。监控PythonUDF的执行情况对于实际生产业务来说非常关键,因此PyFlink会增加对PythonUDF的metric管理。这是Flink1.11中的工作。同时,我们还需要不断优化性能,提供对Datastream和现有k8s的支持,这些在PyFlink1.12中已经提供给大家。未来,Flink已有的功能将继续推送到Python生态中,Python生态强大的功能也将不断融入Flink,从而完成Python生态分发的初衷。当然PyFlink也会重点做生态整合,比如推进与Zeppelin、jupyter、PyAlink等的整合工作。最后简单看一下PyFlink的用例。PyFlink可以应用于事件驱动/数据分析/ETL/机器学习等多种场景。目前在生产中有很多用户。比如比特币大陆、聚美优品等。目前PyFlink已经成熟,非常适合大家选择Flink作为快速构建分布式计算系统的入门级开发语言。目前PyFlink的功能趋于完善,当然还有更多的工作要做,但无论如何,相信后续会逐渐成熟!2020年12月开始投入精力到物联网领域,开始新的探索~作者介绍孙金城,51CTO社区编辑,ApacheFlinkPMC成员,ApacheBeamCommitter,ApacheIoTDBPMC成员,ALC北京成员,ApacheShenYuMentor,Apache软件基金会成员。专注于技术领域的流计算和时序数据存储。