当前位置: 首页 > 后端技术 > Python

如何在ApacheFlink中使用PythonAPI?_0

时间:2023-03-26 18:12:53 Python

本文根据ApacheFlink系列直播课程整理而成,由阿里巴巴高级技术专家ApacheFlinkPMC孙金城分享。重点介绍FlinkPythonAPI的现状和未来规划。主要内容包括:ApacheFlinkPythonAPI的过去、现在和未来的发展;ApacheFlinkPythonAPI的架构和开发环境;ApacheFlinkPythonAPI核心算子介绍及应用。一、ApacheFlinkPythonAPI的过去、现在和未来发展1、Flink为什么选择支持Python?那么Flink为什么要加入对Python的支持,下面会详细分析。最流行的开发语言Python本身就是一门非常优秀的开发语言,根据RedMonk的统计,除了Java和JavaScript之外,流行度排名第三。RedMonk是一家知名的以开发者为中心的行业分析公司。更详细的分析信息,你拿到我的PPT后,可以点击链接详细查看。好了,那么Python的火热跟我们今天给大家分享的流批统一大数据计算引擎ApacheFlink有什么关系呢?带着这个问题,让我们一起思考一下大数据相关的著名开源组件有哪些?比如最早的批处理框架Hadoop?流计算平台Storm,最近异常火爆的Spark?异或Hive做其他领域数据仓库,HBase做KV存储?这些都是非常有名的开源项目,所以这些项目无一例外都支持了PythonAPI。很多支持Python的开源项目生态已经比较完善。基于此,ApacheFlink也在1.9版本中投入了大量精力,推出了全新的Pyflink。除了大数据,人工智能和Python也有着非常密切的关系。ML青睐的语言从上图统计可以看出,PythonAPI本身已经占到机器学习岗位语言需求的0.129%。Python语言似乎比R语言更受欢迎。Python是一种解释型语言,语法的设计理念是“以一种方式做一件事,而且只能用一种方式”。它的简单易用性使其成为世界上最流行的语言,在大数据计算领域有着良好的生态建设。同时Python在机器学习方面也有很好的前景,因此我们在近期发布的ApacheFlink1.9中引入了全新架构的全新PythonAPI。Flink是一个流批统一计算引擎。社区非常重视和关注Flink用户。除了Java语言或者Scala语言,社区希望提供多个入口和渠道。让更多的用户更方便的使用Flink,收获Flink在大数据算力上带来的价值。因此,从Flink1.9开始,Flink社区推出了全新技术体系的PythonAPI,并支持了最常用的算子,如JOIN、AGG、WINDOW等。2.PythonAPI–RoadMapInFlink1.9,虽然Python可以使用JavaUser-definedFunction,但是仍然缺少Python原生User-definedfunction的定义,所以我们计划在Flink1.10中支持PythonUser-definedfunction。.并且在技术上增加了对数据分析工具库Pandas的支持,在Flink1.11中增加了对DataStreamAPI和MLAPI的支持。二、PythonAPI架构及开发环境搭建1.PythonTableAPI架构新的PythonAPI架构分为用户API部分、PythonVM与JavaVM通信部分、最终将作业提交给Flink集群进行查询的部分。手术。那么PythonVM和JavaVM是如何通信的呢?我们将在Python端有一个PythonGateway来维护与Java的通信链接,在Java部分有一个GateWayServer来接收来自Python部分的调用。关于PythonAPI的架构,在1.9之前,Flink的DataSet和DataStream已经有了对PythonAPI的支持,但是他们有两套不同的API,DataSetAPI和DataStreamAPI。对于Flink这个流批统一的流式计算引擎来说,统一的架构非常重要。而对于现有的PythonDataSetAPI和DataStreamAPI,采用的是JPython的技术架构,而JPython本身并不能很好的支持现在的Python3.X系列,所以在Flink1.9发布后,决定替换掉原来的。PythonAPI架构被抛弃,出现了新的技术架构。这套新的PythonAPI是基于TableAPI的。TableAPI和PythonAPI之间的通信采用了一种简单的方法,使用PythonVM和JavaVM进行通信。在编写或调用PythonAPI的过程中,通过某种方式与JavaAPI进行通信。操作PythonAPI就像操作Java的TableAPI。新的架构可以保证以下内容:无需创建一套新的算子,可以很容易地与JavaTableAPI的功能保持一致;得益于现有的JavaTableAPI优化模型,用Python编写的API可以使用JavaAPI优化模型进行优化,可以保证PythonAPI编写的作业也能有极致的性能。如图所示,当Python在Java中发起一个对象的请求时,在Java段中创建该对象并存储在一个存储结构中,并分配一个ID给Python端。Python端获取到Java对象的ID后,就可以对该对象进行处理了。操作,也就是说Python端可以操作任何Java端的对象,这就是为什么新的架构可以保证PythonTableAPI和JavaTableAPI的功能一致,并且可以超越现有的优化模型。在新的架构和通信模型下,调用JavaAPI的PythonAPI只需要持有Java对象的ID,将调用方法的名称和参数传递给JavaVM,然后完成对JavaTable的调用API,所以在这样的框架中开发PythonTableAPI的方式和开发JavaTableAPI的方式是完全一样的。接下来,我将详细介绍如何开发一个简单的PythonAPI作业。2.PythonTableAPI–作业开发一般来说,一个Pythontable作业一般分为四个部分。首先,根据当前情况,需要决定作业是以批处理方式运行还是以流方式运行。当然,后续版本的用户可能不会考虑,但现在的1.9版本还是需要考虑的。在决定了第一步的作业如何执行之后,我们需要了解数据从哪里来,如何定义Source,结构数据类型等信息。然后需要编写计算逻辑,然后对数据进行计算操作,但是最终的计算结果需要持久化到某个系统中。最后定义Sink。与Source类似,我们需要定义SinkSchema和各个字段类型。下面将详细分享如何用PythonAPI编写每一步?首先,我们创建一个执行环境。对于执行环境本身,我们首先需要一个ExecutionEnvironment,基本上我们需要一个TableEnvironment。然后在TableEnvironment中,有一个参数TableConfig,在TableConfig中会有一些执行过程中的配置参数,可以传递给RunTime层。此外,还提供了一些个性化的配置项,可以在实际业务开发中使用。拿到Environment之后,需要定义数据源表。以CSV格式文件为例,用“逗号”分隔,用Field表示这个文件有哪些字段。然后你会看到它当前是用逗号分隔的,而且只有一个字段叫word,类型是String。定义和描述数据源数据结构转换为Table数据结构后,也就是说转换为TableAPI级别后是一种什么样的数据结构和数据类型?下面会通过with_schema添加字段和字段类型。这里只有一个字段,数据类型也是String。最后注册为表,注册到catlog中,可以用于后续的查询计算。创建结果表。计算完成后,这些结果需要存储在持久化系统中。以WordCount为例。首先存储表会有一个word和它的count两个字段,一个是String类型的word,一个是Bigint类型的count,然后注册为一个Sink。写完注册TableSink之后,我们来看逻??辑怎么写。其实用PythonAPI写WordCount和TableAPI一样简单。因为相对于DataSream,PythonAPI只需要一行就可以写一个WordCount。比如groupby,先扫描Source表,然后按一个word分组,然后selectword加上聚合统计Count,最后把最多的数据结果插入结果表。3.PythonTableAPI–环境搭建那么WordCount究竟如何运行呢?首先,你需要搭建一个开发环境。不同的机器可能安装的软件版本不同。这里是一些版本要求和需求,括号里是示例机上的版本。第二步是构建Java二进制分发包从源码构建,那么这个页面就是从原始代码中获取我们的主代码,拉取1.9分支。当然也可以用Mater,但是Master不够稳定。建议大家在自学的过程中使用1.9分支。接下来进行实战演练,首先验证PPT的正确性。首先编译代码,示例如下://下载源码gitclonehttps://github.com/apache/flink.git//拉1.9分支cdflink;gitfetchoriginrelease-1.9gitcheckout-brelease-1.9origin/release-1.9//构建二进制发布包mvncleaninstall-DskipTests-Dfast编译后需要在对应目录下找到发布包:cdflink-dist/target/flink-1.9.0-bin/flink-1.9.0tar-zcvfflink-1.9.0.tar.gzflink-1.9.0构建JavaAPI后测试,我们需要构建Python发布包.因为大多数Python用户都知道,我们需要通过pipinstall方式,将需要的依赖库与本地Python环境集成或安装。Flink也是如此。PyFlink还需要打包一个Pypip可以识别的资源进行安装。实际使用中,你也可以按照这个命令进行复制,在自己的环境中试一试。cdflink-Python;Pythonsetup.pysdist这个过程只是包含了Java包,然后打包了一些Java包和自己PyFlink模块的Python包,会在dist目录下,有一个apache-flink-1.9。dev0.tar.gz.dist目录下的cddist/apache-flink-1.9.dev0.tar.gz就是我们pipinstall可以使用的PyFlink包。在1.9版本中,除了FlinkTable,还有FlinkTableBlink。Flink会同时支持两种方案。如果你能尝试一下,我们可以在Flink的原始Planner和Blink的Planner之间自由切换。你可以尝试一下。打包完成后,我们可以尝试将包安装到我们的实际环境中。接下来是一个很简单的命令,首先检查命令的正确性,在执行之前,我们使用pip查看列表,我们需要查看现有的包中是否有,现在尝试安装刚才的包。在实际使用过程中,如果升级了版本,也必须进行这个过程,必须安装新的包。pipinstalldist/*.tar.gzpiplist|grepflink安装完成后,可以使用刚才写的WordCount例子来验证环境是否正确。验证刚才的正确性,如何验证?为了您的方便,您可以直接克隆enjoyment.code存储库。gitclonehttps://github.com/sunjincheng121/enjoyment.code.gitcdenjoyment.code;Pythonword_count.py下次体验试试。在这个目录下,我们刚刚开发的WordCount例子。直接使用Python或者验证环境是否OK。此时FlinkPythonAPI会启动一个MiniCluster,执行刚才的WordCountJob,提交给一个MiniCluster执行。现在Run这个过程实际上已经在集群上执行过了。实际上,在这段代码中,读取了一个Source文件,并将结果写入了一个CSV文件。在当前目录中,有一个SinkCSV。具体操作步骤可以查看Flink中文社区视频ApacheFlinkPythonAPI现状及规划IDE配置在平时的开发过程中,其实我们大部分还是在本地开发。这里推荐大家使用Pychram来开发Python相关的逻辑。或者工作。同时由于大量截图的存在,这些内容也整理到了博客中。您可以扫描二维码关注并查看一些详细的注意事项。博客详细地址:https://enjoyment.cool。这里有一个关键点。大家要注意,你的环境中可能存在多个Python环境。这时候选择的环境一定是刚才的pipinstall环境。有关详细信息,请参阅ApacheFlinkPythonAPI状态和规划。4.PythonTableAPI——提交作业的其他方式有哪些?这是一个CLI的方式,也就是说,它实际上是提交到一个已经存在的集群中。首先启动一个集群。构建的目录通常在目标目录下。如果要启动集群,可以直接启动。这里要说的是其中一个集群外部有一个WebPort,它的端口地址是在flink-conf.yaml中配置的。根据PPT中的命令,可以查看日志看是否启动成功,然后从外部网站访问。如果集群正常启动,我们看如何提交Job。Flink通过run提交作业。示例代码如下:./bin/flinkrun-py~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py用命令行执行。除了使用PY参数,还可以指定Python模块,以及其他依赖的资源文件,JAR等。在1.9版本中,也为大家提供了一种更方便的方式,就是通过交互的方式编写PythonAPI使用PythonShell获取结果。有两种执行方式,第一种方式是Local,第二种方式是Remote。其实,两者并无本质区别。先看Local,命令如下:bin/pyflink-shell.shlocal启动一个miniCluster,输出的时候会有一个PythonFlinkCLI和一些示例程序供大家体验,按照上面的case即可为了实现正确的输出和提交,Streaming和Batch都可以写。详细步骤请参考视频操作。至此,你应该对Flink1.9上的PythonAPI架构有了一个大概的了解,以及如何搭建PythonAPI环境。并通过一个简单的WordCount例子,体验如何在IDE中执行程序,如何使用Flink运行并交互提交作业。同时也体验了一些现有的FlinkPythonAPI的交互方式。然后介绍了整个Flink的一些环境搭建和一个简单的例子。接下来,我们将详细介绍1.9中的所有核心算子。三、FlinkPythonAPI核心算子介绍及应用1.PythonTableAPI算子上分享创建作业的过程。第一步是选择Streaming或Batch的执行方式;第二步定义使用的表,Source,Schema,数据类型;三是开发逻辑,在写WordCount的时候,使用Count的函数。最后,PythonAPI中内置了很多聚合函数,可以使用count、sum、max、min等。因此,在目前的Flink1.9版本中,已经可以满足大部分通用需求。除了刚才说的伯爵。FlinkTableAPIoperator1.9也支持了。关于FlinkTableAPI的算子,无论是PythonTableAPI还是JavaTableAPI,都有以下几种操作。对第一个单流的操作,比如对流做一些SELECT,Filter,还有一些聚合,包括windowfunction的windowswindow聚合和一些列的操作,比如最底层的add_columns和drop_columns。除了单流操作,还有双流操作,比如双流JOIN、双流minus、union。这些运算符在PythonTableAPI中提供了良好的支持。在Flink1.9中,PythonTableAPI从功能上来说几乎完全等同于JavaTableAPI。下面我们用实际的代码来看看上面的算子是怎么写的,以及如何开发Python算子。2.PythonTableAPIOperator-WatermarkDefinition细心的同学可能注意到我们没有提到流->时序的一个特性。流的特点是来的顺序可能乱了,而这种乱是流上客观存在的一种状态。在Flink中,一般会使用Watermark机制来解决这种乱序问题。如何在PythonAPI中定义水印?假设有一个JSON数据,一个字段String,时间字段datetime。此时定义Watermark,需要在添加Schema时添加rowtime列。行时间必须是时间戳类型。定义Watermark的方法有很多种。上图中的watermarks_periodic_bounded会周期性的发送Watermark,60000的单位是毫秒。如果数据乱序,可以在一分钟内处理乱序,所以这个值调的越大,对数据乱序的接受度越高,但是稍有延迟的数据就会更高。Watermark的原理可以查看我的博客:http://1t.click/7dM。3.PythonTableAPI–JavaUDF最后跟大家分享一下JavaUDF在Flink1.9中的应用。虽然在1.9中不支持PythonUDF,但是Flink提供了可以在Python中使用的JavaUDF。在Flink1.9中,对Table模块进行了优化和重构。目前JavaUDF的开发只需要引入Flink公共依赖即可开发PythonAPI。下面将通过一个具体的例子来介绍使用JavaUDF开发PythonAPIUDF。假设我们开发一个UDF来查找字符串的长度。在Python中,我们需要在Java中使用register_java_function。函数的名称是包的完整路径。那么在使用的时候,就可以使用注册的名字来完成UDF的调用了。详情请参考我的博客:http://1t.click/HQF。如何实施?可以使用Flinkrun命令来执行,需要携带UDFJAR包。JavaUDF只支持标量函数?其实不然,JavaUDF同时支持ScalarFunction、TableFunction和AggregateFunction。如下图:4.PythonTableAPI常用链接上面提到的一些东西,有一些长链的文档和链接,也放在了PPT上,方便大家使用。同时,我在底部还有一个个人博客。希望大家有所帮助。4.总结简单总结一下,本文首先介绍了ApacheFlinkPythonAPI的历史发展过程,介绍了ApacheFlinkPythonAPI架构变化的原因和目前的架构模型;未来FlinkPythonAPI有什么计划和功能特点继续详细介绍,最后希望大家能在QA环节给出一些建议和意见,谢谢!本文作者:孙金城(金竹)阅读原文本文为云栖社区原创内容,未经允许不得转载。