当前位置: 首页 > 科技观察

ApacheFlink扫雷系列——PyFlink如何解决多JAR包依赖问题

时间:2023-03-20 20:26:03 科技观察

《Apache Flink 扫雷系列》简介暂时解决ApacheFlink当前设计问题给用户带来的不便。那么明知存在设计问题,为什么不进行设计重构,避免这些“雷”的存在呢?其实社区的发展和我们每个公司内部产品的发展是一样的。有一些客观因素阻碍了实际问题的及时解决,例如,社区的发布或内部产品发布的周期。在没有新版本发布之前,一些对用户不友好的问题需要一些“非正式”的解决方案,或者临时解决方案。解决方案的特点是可以解决问题,但不是万能的解决方案。只能在民间流传,不能正式推广。所以《Apache Flink 扫雷系列》是为你提供能够解决你真正问题的解决方案,但未必是最佳实践。在本系列中,你可以有更大的机会回馈社区:)开篇说“Ray”本文中的“Ray”目前是针对ApacheFlink1.10之前的版本,使用CLI提交作业时,只可以提交一个JAR函数来解决问题,即为命令参数-j,--jarfileFlink程序JAR文件。目前-j只允许用户提供一个JAR包,在很多场景下是不合理的。且不说用户自己的JAR包,就Flink用户使用的ConnectorJAR,一个作业可能会用到多个JAR。不同的connector类型,比如?为例,使用了Kafka、MySql、CSV等函数式JAR包的依赖。这个常见的问题就是手动将这三个JAR合并为一个,然后在提交作业的时候加上-j选项上传到集群。扫雷难面对合并多个JAR包,可能Java用户还好(虽然不方便,但应该可以操作),但是对于Python用户来说,如果没有参与,可能需要一些时间才能完成JAR包在Java开发中Merging,甚至会有无从下手的感觉。因此本文主要针对不懂Java的FlinkPython用户。案例选择为了让大家体验实际效果,我们选择一个具体的案例来说明如何合并多个JAR。就拿我2020年3月17日在直播中说的《PyFlink 场景案例 - PyFlink实现CDN日志实时分析》来说明吧。案例回顾《PyFlink 场景案例 - PyFlink实现CDN日志实时分析》核心是通过PyFlink对注入Kafka的CDN日志数据进行分区域统计下载量和下载速度,最后将统计数据写入MySql。同时放入Kafka的数据格式为CSV('format.type'='csv')。所以我们依赖的JAR如下:flink-sql-connector-kafka_2.11-1.10.0.jarflink-jdbc_2.11-1.10.0.jarflink-csv-1.10.0-sql-jar.jarmysql-connector-我们可以通过以下命令下载java-8.0.19.jar:$curl-Ohttps://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar$curl-Ohttps://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar$curl-Ohttps://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar$curl-Ohttps://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar我们将上面的4个JAR下载到某个目录,这里我是下载到本地机的temp目录:“迅雷”存在的场景解释了博客《PyFlink 场景案例 - PyFlink实现CDN日志实时分析》为什么没有提到合并JARs的问题?是的,这个“迅雷”的存在是有一定条件的:作业提交集群环境并没有预装你需要的所有JAR(大多数情况下是不会安装的)。必须满足以上条件才会出现扫雷问题。然后在博客中,我在集群环境中预装了需要的JAR,也就是将博客中提到的JAR下载到集群lib目录下PYFLINK_LIB=python-c"importpyflink;importos;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/lib')")操作。合并JAR需要注意的地方合并JAR很重要的一点涉及到JAR包的ServiceProvider机制,详见详细说明。这是一个Python人很难注意到的合并点。JAR包的ServiceProvider机制会让ServiceProvider的配置文件保存在JAR包的META-INF/services目录下。简单的说,它为开发者提供了一种扩展机制。在开发阶段,只需定义接口,然后在包含实现的JAR包中配置实现,然后调用实际接口的实现类。JAR包的META-INF目录结构简要说明如下:、类加载器和服务:MANIFEST.MF-定义与扩展和包相关的数据的清单文件。INDEX.LIST-此文件由JAR工具的新“-i”选项生成,包含应用程序或扩展中定义的包的地址信息。它是JarIndex的一部分,被类加载器用来加速类加载过程。x.SF-JAR文件的签名文件。x代表基本文件名。x.DSA-此签名块文件与同名的基本签名文件相关。该文件存储相应签名文件的数字签名。services-此目录存储所有服务提供者配置文件。注意:提供程序配置文件必须以UTF-8编码。合并操作1.解压JARs$mkdirjobjarcsvjdbckafkamysql其中jobjar存放的是我们最终打包的JAR内容,csvjdbckafkamysql存放的是对应JAR的解压内容。具体命令如下:$unzipflink-csv-1.10.0-sql-jar.jar-dcsv/$unzipflink-sql-connector-kafka_2.11-1.10.0.jar-dkafka/$unzipflink-jdbc_2.11-1.10.0.jar-djdbc/$unzipmysql-connector-java-8.0.19.jar-dmysql解压后在刚才的目录下我们会得到如下文件内容:我们的核心是处理class文件夹和META-INF/services文件夹,如图所示,csv和kafka解压后的JAR内容。其中Class文件夹直接复制即可。但是,需要合并具有相同名称的服务。比如Flink的Connector的服务发现配置org.apache.flink.table.factories.TableFactory需要合并文件内容。2.MergeJARs首先我们创建META-INF和META-INF/services目录,目录结构如下:jincheng:jobjarjincheng.sunjc$tree-L2.└──META-INF└──services2directories,0files(1)class文件合并将csvjdbckafkamysql的类直接复制到jobjar目录下,如下:$cp-rf../csv/org.$cp-rf../jdbc/org.$cp-rf../kafka/org.$cp-rf../mysql/com.$tree-L2.├──META-INF│└──services├──com│└──mysql└──org└──apache详细目录结构如下:(2)services结合ServiceProvider是一个标准的JAR。不仅Flink的Connector使用了ServiceProvider机制,Kafka也使用了配置好的服务发现机制。所以我们需要通过文件名合并服务的所有内容。以csv和kafka为例:CSV中的META-INF/services中只有一个Flink的connector相关配置,内容如下:Kafka中的META-INF/services中有Flink的connector相关配置,里面有Kafka使用的配置如下:所以我们需要直接将Kafka的相关信息复制到jobjar/META-INF/services/目录下,然后合并关于org.apache.flink.table.factories.TableFactory的csv和Kafka配置.合并的内容如下:#LicensedtotheApacheSoftwareFoundation(ASF)under......#limitationsundertheLicense.org.apache.flink.formats.csv.CsvRowFormatFactoryorg.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory我们最终得到4个JAR服务配置合并后的最终代码如下:.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory您可以尝试以下命令:$cat../csv/META-INF/services/org.apache.flink.table.factories.TableFactory|grep^[^#]>>META-INF/services/org.apache.flink.table.factories.TableFactory$cat../kafka/META-INF/services/org.apache.flink.table.factories.TableFactory|grep^[^#]>>META-INF/services/org.apache.flink.table.factories.TableFactory$cat../kafka/META-INF/services/org.apache.flink.kafka.shaded.org.apache.kafka.common.config.provider.ConfigProvider|grep^[^#]>>META-INF/services/org.apache.flink.kafka.shaded.org.apache.kafka.common.config.provider.ConfigProvider$cat../jdbc/META-INF/services/org.apache.flink。table.factories.TableFactory|grep^[^#]>>META-INF/services/org.apache.flink.table.factories.TableFactory3。创建JAR这一步没有特别强调,直接用zip或者jar命令打包就可以了你自己的包和我的一样:)好的,到这里我们完成了合并多个JAR的工作。我们可以尝试使用CLI来提交命令。CLI提交作业启动集群(我修改了flink-conf,把端口改成4000)/usr/local/lib/python3.7/site-packages/pyflink/bin/start-cluster.shlocalStartingcluster.Startingstandalonesessiondaemononhostjincheng.local.Startingtaskexecutordaemonhostjincheng。当地的。提交作业不加-j选项时,提交作业如下:$PYFLINK_LIB/../bin/flinkrun-mlocalhost:4000-pycdn_demo.py报错如下:Providethecorrect-jparameterandsubmit我们打包的JAR到集群的情况如下:$PYFLINK_LIB/../bin/flinkrun-j~/temp/jobjar/myjob.jar-mlocalhost:4000-pycdn_demo.py同时web控制台可以查看thesubmittedjobs:Summary本文核心介绍PyFlink用户如何解决多JARs依赖的job提交问题?也许这不是最好的解决方案,但至少它是您解决多JAR依赖作业提交的方法之一。好的解决办法,留言或者邮件分享给我:)!【本文为专栏作家“金竹”原创稿件,转载请联系原作者】点此查看该作者更多好文