当前位置: 首页 > 后端技术 > Python

在PySpark数据框中添加新列的5种方法

时间:2023-03-26 14:48:25 Python

每天生成太多数据。虽然有时我们可以使用Rapids或Parallelization等工具来管理大数据,但如果您正在处理数TB的数据,Spark是一个很棒的工具。虽然这篇文章解释了如何使用RDD和基本的Dataframe操作,但我在使用PySparkDataframes时错过了很多。只有当我需要更多功能时,我才会阅读并想出多种解决方案来做一件事。如何在Spark中创建新列?现在,这听起来微不足道,但相信我,事实并非如此。您可能想要处理如此多的数据,所以我很确定您最终会在工作流程中使用这些列创建过程中的大部分。有时使用Pandas函数,有时使用基于RDD的分区,有时使用成熟的python生态系统。这篇文章将是关于“在PysparkDataframe中创建新列的各种方法”。如果您安装了PySpark,则可以跳过下面的“入门”部分。开始使用Spark我知道很多人没有在他们的系统中安装Spark来尝试和学习。但是安装Spark本身就很头疼。由于我们想了解它的工作原理和使用方法,因此我们建议您在此处通过社区版在线使用SparkonDatabricks。不用担心,它是免费的,虽然它的资源较少,但是对于我们来说,出于学习目的,它现在就可以使用。注册并登录后,将显示以下屏幕。你可以在这里开始一个新的笔记本。选择PythonNotebook,并为笔记本命名。启动新笔记本并尝试执行任何命令后,笔记本会询问您是否要启动新集群。下一步将检查sparkcontext是否存在。要检查sparkcontext是否存在,您必须运行以下命令:sc这意味着我们已经设置了一个可以运行Spark的笔记本。数据在这里,我将使用Movielensml-100k.zip数据集。1000个用户观看1700部电影的100000个评分。在这个压缩文件夹中,我们将专门使用的文件是评估文件。文件名保留为“u.data”如果您想上传此数据或任何数据,可以单击左侧的“数据”选项卡并使用提供的GUI添加数据。然后我们可以加载数据:ratings=spark.read.load("/FileStore/tables/u.data",format="csv",sep="\t",inferSchema="true",header="false")ratings=ratings.toDF(*['user_id','movie_id','rating','unix_timestamp'])看起来像这样:ratings.show()好的,现在我们准备开始我们感兴趣的部分.如何在PySparkDataframe中创建新列?使用Spark本机函数在PySparkDataFrame中创建新列的最pysparkish方法是使用内置函数。这是创建新列的最有效的编程方式,因此这是我想要进行某些列操作时首先去的地方。我们可以将.withcolumn与PySparkSQL函数一起使用来创建新列。本质上,您可以找到已经使用Spark函数实现的字符串函数、日期函数和数学函数。我们可以将spark函数导入为:importpyspark.sql.functionsasF我们的第一个函数F.col函数使我们能够访问列。所以如果我们想将一列乘以2,我们可以使用F.col作为:ratings_with_scale10=ratings.withColumn("ScaledRating",2*F.col("rating"))ratings_with_scale10.show()我们也可以使用Mathematical函数,例如F.exp函数:ratings_with_exp=ratings.withColumn("expRating",2*F.exp("rating"))ratings_with_exp.show()此模块中提供了许多其他函数,足以满足最简单的示例.您可以在此处查看功能列表。SparkUDF有时我们想用一列或多列做复杂的事情。将其视为对PySpark数据帧到单个或多个列的映射操作。虽然SparkSQL函数确实解决了许多关于创建列的用例,但每当我想使用更成熟的Python功能时,我都会使用SparkUDF。要使用SparkUDF,我们需要使用F.udf函数将常规python函数转换为SparkUDF。我们还需要指定函数的返回类型。在此示例中,返回类型为StringType()importpyspark.sql.functionsasFfrompyspark.sql.typesimport*defsomefunc(value):ifvalue<3:return'low'else:return'high'#converttoaUDF函数通过传入函数和函数的返回类型udfsomefunc=F.udf(somefunc,StringType())ratings_with_high_low=ratings.withColumn("high_low",udfsomefunc("rating"))ratings_with_high_low.show()有时使用RDD,SparkUDF和SQL函数不足以满足特定用例。您可能想利用SparkRDD获得的更好的分区。或者,您可能希望在SparkRDD中使用组函数。您可以使用此方法,主要是当您需要访问python函数内的spark数据帧中的所有列时。不管怎样,我发现这种使用RDD创建新列的方式对于有RDD经验(这是Spark生态系统的基本组成部分)的人非常有用。下面的过程利用此功能在Row和pythondict对象之间进行转换。我们将行对象转换为字典。像我们习惯的那样使用字典,然后再次将该字典转换回行。importmathfrompyspark.sqlimportRowdefrowwise_function(row):#convertrowtodict:row_dict=row.asDict()#使用新的列名和值在字典中添加一个新键。row_dict['Newcol']=数学。exp(row_dict['rating'])#将字典转换为行:newrow=Row(**row_dict)#返回新行returnnewrow#将评级数据框转换为RDDratings_rdd=ratings.rdd#将我们的函数应用于RDDratings_rdd_new=ratings_rdd.map(lambdarow:rowwise_function(row))#ConvertRDDBacktoDataFrameratings_new_df=sqlContext.createDataFrame(ratings_rdd_new)ratings_new_df.show()此功能在PandasUDFSpark2.3.1版本中引入。这允许您在Spark中使用Pands功能。当我需要在Spark数据帧上运行groupby操作或者我需要创建滚动函数并想使用Pandas滚动函数/窗口函数时,我通常会使用它。我们使用它的方式是使用F.pandas_udf装饰器。我们在这里假设函数的输入将是一个pandas数据框。我们需要从这个函数中依次返回一个Pandasdataframe。这里唯一的麻烦是我们必须为输出数据框提供一个模式。我们可以使用以下格式来做到这一点。#声明函数输出的模式True),StructField('unix_timestamp',IntegerType(),True),StructField('normalized_rating',DoubleType(),True)])#用pandas_udfdecorator@F.pandas_udf(outSchema,F.PandasUDFType.GROUPED_MAP)装饰我们的函数defsubtract_mean(pdf):#pdf是一个pandas.DataFramev=pdf.ratingv=v-v.mean()pdf['normalized_rating']=vreturnpdfrating_groupwise_normalization=ratings.groupby("movie_id").apply(subtract_mean)rating_groupwise_normalization.show()我们还可以利用它在每个spark节点上训练多个单独的模型。为此,我们复制数据并为每个复制提供一个键和一些训练参数,如max_depth等。然后我们的函数将采用pandasDataframe,运行所需的模型,并返回结果。结构如下图所示。#0.声明函数输出的模式outSchema=StructType([StructField('replication_id',IntegerType(),True),StructField('RMSE',DoubleType(),True)])#用pandas_udf装饰我们的函数decorator@F.pandas_udf(outSchema,F.PandasUDFType.GROUPED_MAP)defrun_model(pdf):#1.获取超参数值num_trees=pdf.num_trees.values[0]depth=pdf.depth.values[0]replication_id=pdf.replication_id.values[0]#2.训练测试拆分Xtrain,Xcv,ytrain,ycv=train_test_split.....#3.使用pandas数据帧创建模型clf=RandomForestRegressor(max_depth=depth,num_trees=num_trees,....)clf.fit(Xtrain,ytrain)#4.评估模型rmse=RMSE(clf.predict(Xcv,ycv)#5.返回结果作为pandasDFres=pd.DataFrame({'replication_id':replication_id,'RMSE':rmse})returnresresults=replicated_data.groupby("replication_id").apply(run_model)上面只是一个想法,不是代码,应该稍微修改一下。使用SQL对于那些喜欢SQL的人,您甚至可以使用SQL创建列。为此,我们需要注册一个临时SQL表,然后使用一个带有附加列的简单选择查询。也可以将它用于连接。ratings.registerTempTable('ratings_table')newDF=sqlContext.sql('select*,2*ratingasnewColfromratings_table')newDF.show()希望我已经很好地介绍了列创建过程以帮助您解决Spark问题。文渊网,仅供学习,侵删。学习Python的路上肯定会遇到困难,不要慌张,我这里有一套学习资料,包括40+电子书,800+教学视频,涉及Python基础、爬虫、框架、数据分析、机学习等等,别怕你学不会!https://shimo.im/docs/JWCghr8...《Python学习资料》关注公众号【蟒圈】,每日优质文章推送。