当前位置: 首页 > 科技观察

Spark中的核心概念RDD我们了解多少?

时间:2023-03-12 13:19:17 科技观察

RDD的全称叫做弹性分布式数据集(ResilientDistributedDatasets),是一种分布式内存抽象,代表只读记录分区的集合,只能通过其他RDD转换创建。为此,RDD支持丰富的转换操作(如:map、join、filter、groupBy等),通过这种转换操作,新的RDD包含了从其他RDD派生所必需的信息,因此RDD之间存在依赖关系的。基于RDD之间的依赖关系,RDD会形成一个有向无环图DAG,它描述了整个流式计算过程。在实际执行中,即使数据分区丢失,RDD也是通过lineage一次完成的,分区也可以通过血缘关系重建。综上所述,基于RDD的流式计算任务可以描述为:从稳定的物理存储(如分布式文件系统)中加载记录,将记录传入由一组确定性操作组成的DAG,然后写回稳定的贮存。另外,RDD还可以在内存中缓存数据集,使得数据集可以在多个操作之间复用。基于此特性,可以轻松构建迭代应用(图计算、机器学习等)或交互式数据分析应用。可以说,Spark最初是一个实现了RDD的分布式系统,后来通过不断的发展,发展成为一个比较完善的大数据生态系统。简单的说,Spark-RDD之间的关系类似于Hadoop-MapReduce之间的关系。RDD特性RDD表示一个只读的分区数据集。修改RDD,只需要通过RDD转换操作,就可以从一个RDD得到一个新的RDD。新的RDD包含从其他RDD派生的必要信息。RDD之间存在依赖关系,RDD的执行是根据血缘关系延迟计算的。如果血缘关系长,可以通过坚持RDD来切断血缘关系。分区如下图所示。RDDs是逻辑分区的,每个分区的数据抽象存在。在计算时,每个分区的数据是通过计算函数得到的。如果RDD是通过已有的文件系统构建的,compute函数会读取指定文件系统中的数据。如果RDD是从其他RDD转换过来的,compute函数会执行转换逻辑来转换其他RDD的数据。只读如下图所示,RDD是只读的。如果要更改RDD中的数据,只能在已有RDD的基础上创建新的RDD。一个RDD到另一个RDD的转换可以通过丰富的操作算子来实现,而不是像MapReduce那样只写map和reduce,如下图所示。RDD的操作算子包括两种,一种称为transformations,用于对RDD进行改造,建立RDD的血缘关系;另一个叫做actions,用于触发RDD的计算,获取RDD的相关计算结果或者将RDD保存到文件系统中。下图是RDD支持的算子列表。依赖的RDD由操作符转换,转换后的新RDD包含从其他RDD派生的必要信息。RDD之间的血缘关系也称为依赖关系。如下图,有两种依赖,一种是窄依赖,RDD之间的分区是一一对应的,另一种是宽依赖,下游RDD的每个分区都与上游相连RDD(也叫parentRDD)的每个partition都是相关的,多对多的关系。通过RDD之间的这种依赖关系,一个任务流可以描述为一个DAG(有向无环图),如下图所示,在实际执行过程中,宽依赖对应于Shuffle(图中reduceByKey和join),窄依赖dependencies中的所有转换操作都可以像pipeline一样一次性执行(图中map和union可以一起执行)。缓存如果同一个RDD在应用中被多次使用,可以缓存该RDD。只有在第一次计算RDD时,才会根据血缘关系获取分区数据,以后在其他地方使用RDD时,会直接从缓存中取数据,而不是根据血缘关系计算,这将加快以后的重用。如下图所示,RDD-1经过一系列转换得到RDD-n,保存到hdfs中。RDD-1在这个过程中会产生一个中间结果。如果缓存在内存中,那么在后面的RDD-1到RDD-m的转换过程中,不会计算前面的RDD-0。Checkpoint虽然RDD的血缘关系天然可以实现容错,但是当RDD的某个partition的数据发生故障或者丢失时,可以通过血缘关系进行重建。但是对于长期迭代的应用,随着迭代的进行,RDD之间的血缘关系会越来越久。一旦在后续的迭代过程中出现错误,需要通过很长的血缘关系重新构建,势必会影响性能。.为此,RDD支持checkpoint将数据保存在持久化存储中,这样就可以切断之前的血缘关系,因为checkpoint之后的RDD不需要知道它的父RDD,它可以从checkpoint获取数据。小结综上所述,给定一个RDD,我们至少可以知道以下信息:1.分区数和分区方式;2.父RDD派生的相关依赖信息;3.计算每个分区的数据。计算步骤为:1)如果有缓存,则从缓存中取出分区的数据;2)如果是Checkpointed,则从Checkpoint恢复数据;3)分区的数据是按照血缘关系计算的。编程模型在Spark中,RDD被表示为对象,RDD通过对对象的方法调用进行转换。经过一系列的Transformation之后,就可以调用Actions来触发RDD的计算了。操作可以将结果返回给应用程序(计数、收集等),或将数据保存到存储系统(saveAsTextFile等)。在Spark中,RDD计算(即惰性执行)只在遇到Action时才执行,这样就可以在运行时通过管道传递多个转换。使用Spark,开发者需要编写Driver程序,提交给集群调度运行Workers,如下图所示。Driver中定义一个或多个RDD,调用RDD上的action,Worker执行RDD分区计算任务。应用实例下面是一个简单的Spark应用实例WordCount,统计数据集中每个单词出现的次数。首先从HDFS加载数据,得到原始的RDD-0,其中每条记录是数据中的一行句子。经过一个flatMap操作将一行句子分成多个独立的词得到RDD-1,然后通过map操作将每个词映射成key-value的形式,其中key是词本身,value是初始的计数值1得到RDD-2,合并RDD-2中的所有记录,统计每个单词的计数,得到RDD-3,最后保存到HDFS。objectWordCount{defmain(args:Array[String]){if(args.length<2){System.err.println("Usage:WordCount");System.exit(1);}valconf=newSparkConf().setAppName("WordCount")valsc=newSparkContext(conf)valresult=sc.textFile(args(0)).flatMap(line=>line.split("")).map(word=>(word,1).reduceByKey(_+_)result.saveAsTextFile(args(1))}}结语基于RDD的Spark相对于传统的HadoopMapReduce有哪些优势呢?总结起来至少应该有三点:1.RDD提供了丰富的操作算子,不再只有map和reduce两种操作,更方便描述应用;2.通过RDD之间的转换构造DAG,中间结果无需落地;3.RDD支持缓存,计算可以在记忆中快速完成。