python-PySparkTips:本文根据b站黑马python类整理链接指引=>2022新版黑马程序员python教程1.PySpark库安装在“CMD”命令提示符程序中,输入:pipinstallpyspark或者使用国内代理镜像网站(清华大学来源)pipinstall-ihttps://pypi.tuna.tsinghua.edu.cn/simplepyspark2.PySpark的编程模型是什么?数据输入:通过SparkContext完成数据读取数据计算:将读取到的数据转换为RDD对象,调用RDD成员方法完成计算数据输出:调用RDD数据输出相关成员方法,将结果输出到list、tuple、Dictionaries、文本文件,databases等#LeadpackagefrompysparkimportSparkConf,SparkContext#创建sparkconf类对象conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')#根据SparkConf类对象Object创建SparkContextsc=SparkContext(conf=conf)#打印print(sc.version)#停止SparkContext对象的权限sc.stop()三、数据输入3.1RDD对象RDD的全称是:弹性分布式数据集(ResilientDistributedDatasets)PySpark对数据的处理,以RDD对象为载体,即RDD中存储的各种数据的计算方式也是RDD的RDD成员方法的数据计算方式,返回值仍然是RDD对象。frompysparkimportSparkConf,SparkContextconf=SparkConf().setMaster("local[*]").setAppName("test_spark")sc=SparkContext(conf=conf)#准备一个rddrdd=sc.parallelize([1,2,3,4,5])print(rdd.collect())sc.stop()4.数据计算4.1mapoperatormapoperator(成员方法):接受一个处理函数,可以快速写成lambda表达式,对RDD中的元素进行一个一个的处理,返回一个新的RDDfrompysparkimportSparkConf,SparkContextconf=SparkConf().setMaster("local[*]").setAppName("test_spark")sc=SparkContext(conf=conf)#准备一个rddrdd=sc.parallelize([1,2,3,4,5])#print(rdd.collect())#使用map方法收集所有数据*10#deffunc(data):#returndata*10#rdd2=rdd.map(lambdax:x*10)#rdd2=rdd.map(func)print(rdd2.collect())#sc.stop()4.2flatMap算子比map多了计算逻辑,可以去掉一层嵌套。4.3reduceByKey算子接受一个处理函数,对数据进行两两计算。可以使用lambda快速编写函数对RDD数据进行逐一处理,并得到True值作为返回值保存在RDD中4.5distinctoperatordistinctoperator:完成对RDD中数据的去重操作4.6sortByoperatorsortByoperator:接收一个处理函数,可以使用lambda快速写一个函数来表示判断排序的依据。您可以控制升序或降序全局排序。需要设置分区数为1。5.数据输出数据输出方式:collect:将RDD内容转换为listreduce:自定义RDD内容Aggregationtake:取出RDD的前N个元素组成listcount:统计数量RDD元素的集合5.1输出为Python对象5.1.1collect算子将RDD的每个partition中的数据收集到Driver中形成一个list对象#returnvalue是一个listrdd.collect()5.1.2reduce算子聚合RDD数据集按照你传入的逻辑5.1.3take算子函数:取出RDD的前N个元素,组合成一个list返回给你5.1.4count算子函数:计算有多少个RDD数据较少,返回值为一个数字5.2输出到文件5.2.1rdd.saveAsTextFile算子函数:将RDD数据写入文本文件,支持本地写入,hdfs等文件系统注意事项:调用算子保存文件子,需要配置Hadoop依赖下载Hadoop安装包http://archive.apache.org/dis...解压到电脑任意位置使用Python代码中的os模块配置:os.environ['HADOOP_HOME']='HADOOP解压文件夹路径'下载winutils.exe,放到Hadoop解压文件夹的bin目录下https://raw.githubusercontent...下载hadoop.dll,放到文件夹:C:/Windows/System32https://raw.githubusercontent...5.2.2修改rdd分区为1方法一,SparkConf对象设置属性globalparallelism为1:方法二,创建RDD时设置(parallelize方法为传递给numSlic并在此处插入代码切片es参数为1)总结一下,上面的ispython-PySpark,后续会持续更新。欢迎大家点赞关注~~
