RDD简介RDD(ResilientDistributedDataset)全称为分布式数据集,是Spark中最基本的数据抽象。它代表一个不可变的、可分区的、并行的计算元素集合。RDD是类RDD的一个属性1.存储访问每个Partition的首选位置的列表。对于一个HDFS文件,这个列表存储了每个Partition所在的块的位置。根据“移动数据不如移动计算”的理念,Spark在调度任务时,会尽可能将计算任务分配到它需要处理的数据块的存储位置。2、保存了计算每个分区的函数。这种计算方法将应用于每个数据块。Spark中RDD的计算是基于分片的。每个RDD都会实现计算函数来达到这个目的。计算函数会复合迭代器,不需要保存每次计算的结果。3.RDD之间的依赖关系。RDD的每一次转换都会产生一个新的RDD,因此RDD之间会存在类似于管道的前后端依赖关系。当部分分区数据丢失时,Spark可以通过这个依赖重新计算丢失的分区数据,而不用重新计算RDD的所有分区。4、RDD的分区函数(Partitioner),一个是基于hash的HashPartitioner,一个是基于range的RangePartitioner。只有key-valueRDD才会有Partitioner,非key-valueRDD的Partitioner值为None。Partitioner函数不仅决定了RDD本身的分片数,还决定了父RDDShuffle输出时的分片数。5.一组切片(Partition),它是数据集的基本单位。对于RDD,每个分片都会被一个计算任务处理,这就决定了并行计算的粒度。用户可以在创建RDD时指定RDD的分片数。如果未指定,将使用默认值。默认值是分配给程序的CPU核心数。如何创建一个RDD1。通过序列化一个集合来创建一个RDD(parallelize,makeRDD)2。通过读取外部数据源(testFile)3。对其他rdds进行转换操作,将其转换为行RDDRDD的两类操作符:1.转换map(func):返回一个新的分布式数据集,由func函数转换后的每一个原始元素组成filter(func):返回一个新的数据集,由func函数后返回true的原始元素组成flatMap(func):类似于map,但是每个输入元素会被0映射到多个输出元素(所以,func函数的返回值为aSeq,notasingleelement)flatMap(func):类似于map,但是每个输入元素会被映射为0到多个输出元素(所以,func函数的返回值是一个Seq,而不是单个元素)sample(withReplacement,frac,seed):按照fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器的种子union(otherDataset):返回一个新的数据集,由合并原始数据集和参数):用在(K,V)对的数据集上,返回(K,V)对的数据集,使用指定的reduce函数将具有相同key的值聚合在一起。与groupbykey类似,可以通过第二个可选参数配置任务个数。join(otherDataset,[numTasks]):在(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个keyDatasets中的所有元素都在一起groupWith(otherDataset,[numTasks]):在类型为(K,V)和(K,W)的数据集上调用,返回一个数据集,其元素为(K,Seq[V],Seq[W])元组。该操作在其他框架中称为CoGroupcartesian(otherDataset):Cartesianproduct。但是当在数据集T和U上调用时,将返回(T,U)对的数据集,并且所有元素都是交互式笛卡尔。intersection(otherDataset):将源RDD和参数RDD相交后返回一个新的RDDdistinct([numTasks]));在对RDD上的(K,V)中的源RDD进行重复数据删除后返回一个新的RDDgroupByKey([numTasks]),返回一个(K,Iterator[V])RDDreduceByKey(func,[numTasks])调用一个(K,v)RDD,返回一个(K,V)RDD,使用指定的reduce函数将相同key的值聚合在一起,类似于groupByKey,reduce任务的个数可以通过第二个可选参数aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])sortByKey([ascending],[numTasks])在(K,V)RDD上调用,K必须实现Ordered接口,返回一个key排序的(K,V)RDDsortBy(func,[ascending],[numTasks])类似于sortByKey,但更灵活join(otherDataset,[numTasks])在(K,V)和(K,W)类型的RDD上调用,返回一对所有元素对应于相同的键(K,(V,W))RDDcogroup(otherDataset,[numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable2.Actionreduce(func)通过func函数收集RDD中的所有元素,这个funccollect()驱动程序中,以数组的形式返回数据集的所有元素count()返回RDD中的元素个数first()返回RDD中第一个元素的个数(类似于take(1))take(n)返回一个由数据集的前n个元素组成的数组takeSample(withReplacement,num,[seed])返回一个由数据集的前n个元素组成的数组由num个元素随机组成sampled,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子takeOrdered(n,[ordering])saveAsTextFile(path)将数据集的元素以文本文件的形式保存到HDFS文件系统或其他支持的文件系统中。对于每一个元素,Spark都会调用toString方法将其替换为文件中的文本saveAsSequenceFile(path)将数据集中的元素以Hadoopsequencefile的格式保存到指定目录,可以让HDFS或者其他文件系统支持通过HadoopsaveAsObjectFile(path)countByKey()为类型为(K,V)的RDD返回一个(K,Int)映射,指示每个键对应的元素数量。foreach(func)在要更新的数据集的每个元素上运行函数func。RDD的依赖关系1.窄依赖窄依赖是指每个父RDD的Partition至多被子RDD的一个Partition使用。Partition会依赖同父RDD的Partition总结:狭义依赖我们形象比喻Superborn3.Lineage(血统)RDD只支持粗粒度转换,即对大量记录进行的单一操作。记录一系列为了恢复丢失的分区而创建RDD的Lineages(也就是沿袭)。RDD的Lineage会记录RDD的元数据信息和转换行为。当RDD的部分分区数据丢失时,可以根据这些信息重新计算恢复丢失的数据分区。DAG生成DAG(DirectedAcyclicGraph)称为有向无环图。原始的RDD经过一系列的变换形成了一个DAG。根据RDD之间依赖关系的不同,将DAG划分为不同的Stage。对于narrowdependencies,分区的转换处理是在Stage中计算的。对于宽依赖,由于shuffle的存在,只有父RDD处理完才能开始下一次计算,所以宽依赖是划分stage的基础。RDD缓存Spark非常快的原因之一是它可以在不同的操作期间将数据集持久化或缓存在内存中。持久化某个RDD后,每个节点会将计算出的分片结果保存在内存中,并在对该RDD或派生RDD执行的其他动作中复用。这使得后续操作更快。RDD相关的持久化和缓存是Spark最重要的特性之一。可以说缓存是Spark构建迭代算法和快速交互查询的关键。查找依赖关系来划分stage的目的之一就是划分cache。如何通过stage的划分来设置缓存?(1)窄依赖要设置缓存时使用缓存(2)宽依赖要设置缓存时使用检查点如何设置缓存和检查点?cache:someRDD.cache()会添加成功的缓存并放入内存中。someRDD.persist(StorageLevel.MEMORY_AND_DISK):根据需要设置缓存位置(内存和硬盘)。在本地磁盘上,也可以先hdfssc.setCheckpointDIr("hdfs://hadoop1:9000/checkpoint")设置checkpoint的路径再设置someRDD.checkpoint()设置checkpointcache和checkpoint的区别。改变RDD依赖,checkpoint生成新的RDD,后续的RDD会依赖改变了的新RDD依赖。数据恢复的顺序:检查点---“缓存--”重新计算
