大家好,我是Tom。互联网时代,随着商业的数字化,越来越多的数据可供使用。如何用好数据,将数据商业化,需要一把利器。很多人都用过Hadoop,它由两部分组成,HDFS和MapReduce。MapReduce是Hadoop的分布式计算引擎。在计算过程中,需要频繁放置磁盘,性能会较弱。今天就带大家快速熟悉一个大数据框架Spark。Spark是一种内存计算引擎,具有更好的性能。从2014年开始流行,支持流计算Streaming、数据分析SQL、机器学习MLlib、图计算GraphFrames等多种场景。支持的语言很多,例如Python、Java、Scala、R和SQL。提供丰富的开发算子,如RDD、DataFrame、Dataset。有了这些基础工具,开发者就可以像搭乐高一样快速完成各种业务场景的系统开发。1.来点个性感首先我们看一个简单的代码例子,给大家来点个性感。importorg.apache.spark.rdd.RDDvalfile:String="/Users/onlyone/spark/demo.txt"//加载文件vallineRDD:RDD[String]=spark.sparkContext.textFile(file)valwordRDD:RDD[字符串]=lineRDD。flatMap(line=>line.split(""))valkvRDD:RDD[(String,Int)]=wordRDD.map(word=>(word,1))valwordCounts:RDD[(String,Int)]=kvRDD.reduceByKey((x,y)=>x+y)wordCounts.foreach(wordCount=>println(wordCount._1+"appeared"+wordCount._2+"times"))我们看到,入口代码从第四行的spark变量开始。在spark-shell中,由系统自动创建,是SparkSession的一个实例化对象,可以直接使用,无需每次都创建一个新的对象。SparkSession是Spark程序的统一开发入口。要开发Spark应用程序,必须先创建一个SparkSession。2.RDDResilientDistributedDatasets,全称ResilientDistributedDatasets,是一种包含内存和磁盘中所有分布式数据实体的抽象,是Spark的核心模块和类。RDD承载数据的基本单位是数据分片。在分布式计算环境中,一个完整的数据集会按照一定的规则划分成多个数据分片。这些数据分片均匀分布到集群中的不同计算节点和执行进程,从而实现分布式并行计算。RDD包含4大属性:数据分片、分区。碎片规则,分区器。RDD依赖,依赖关系。变换函数,计算。RDD代表分布式数据的形式。RDD与RDD之间的转换本质上是数据形式的转换。其中一个重要的角色是运营商。3.OperatorsOperators分为两类,Transformations和Actions。转换运算符:通过函数方法将数据从一种形式转换为另一种形式。Actions算子:收集计算结果,或将数据物化到磁盘。重点:mapPartitions与map有类似的功能,但是mapPartitions算子是在数据分区的粒度上初始化共享对象,比如数据库连接对象,S3文件句柄等。结合以上两类算子,Spark的操作分为两种链接:不同数据形式之间的转换,计算流图(DAG)的构建。通过Actions类算子,以回溯的方式触发这个计算流图的执行。顺便说一句,Java中也引入了回溯,比如Stream流就是类似的机制。一个流程可能会引入很多算子,但不会立即执行。只有当开发者调用Actions算子时,才会执行之前调用的转换算子。这也称为延迟计算。延迟计算是Spark分布式运行机制的一大亮点。执行引擎可用于从全局角度优化执行过程。4、分布式计算在Spark应用中,程序入口是SparkSession的主要功能。SparkSession提供Spark运行时的上下文,如调度系统、存储系统、内存管理、RPC通信等),为开发者提供创建、转换、计算分布式数据集的开发API。运行这个SparkSession的main函数的JVM进程称为Driver。Driver职责:解析用户代码,构建DAG图,然后将计算流图转化为分布式任务,分发给集群的Executor执行。定期与各个Executor沟通,及时获取任务进度,从而协调整体执行进度。Executors职责:调用内部线程池,结合预先分配的数据片段,并发执行任务代码。每个Executor负责处理RDD的数据分片的一个子集。分布式计算的核心是任务调度,主要是Driver和Executors之间的交互。Driver的任务调度依赖于DAGScheduler、TaskScheduler和SchedulerBackend。计算流程:Driver通过Action操作符foreach触发计算流程图的执行。上图是从左到右执行的,以shuffle为界创建和分发分布式任务。textFile、flatMap和mapoperators合并为一个任务并分发给每个Executor。Executor收到任务后,解析任务,将任务拆解为textFile、flatMap、map三个步骤,然后分别处理自己负责的数据分片。每个Executor在执行后得到一个中间结果,然后向Driver报告任务的进展情况。然后Driver进行后续的聚合计算,由于数据分散在多个shard上,会触发shuffle操作。shuffle机制是将多个Executor中的计算结果重新路由分发到同一个Executor中,然后重新处理汇总后的数据。集群内跨进程和节点的数据交换。可能存在需要特别注意的网络性能瓶颈。不同的Executor完成数据交换后,Driver分发下一阶段的任务,统计字数。同一个key的数据已经分发给同一个Executor,每个Executor独立完成计数统计。最后Executors将最终的计算结果统一返回给Driver。重点:DAG到Stages的拆分过程,以Actions算子为触发起点,从后往前回溯DAG,以Shuffle为界划分Stages。收集结果:收集的结果根据收集路径的不同主要分为两类:将各个Executor的计算结果收集到Driver端。计算结果通过Executors直接持久化到文件系统。如:HDFS或S3分布式文件系统。五、调度系统1、DAGScheduler根据用户代码构造DAG,以Shuffle为界切割Stages。每个Stage根据RDD中Partition的个数确定Task个数,然后构建TaskSets,再将TaskSets提交给TaskScheduler进行调度。2、TaskScheduler根据任务的局部倾向,在TaskSet中选择适合调度的Task,然后将Task分配给Executor执行。3.SchedulerBackend使用ExecutorDataMap数据结构记录各个计算节点中Executors的资源状态,如RPC地址、主机地址、可用CPU核数、满配CPU核数等。4、Task是运行在Executor上的工作单元。5、JobSparkContext提交的具体Action操作往往对应于Action。6.Stage的每一个Job都会被分成很多组tasks(任务),每组tasks称为Stage,也称为TaskSet。调度系统的核心思想:数据不动,代码动。6.内存管理Spark的内存分为4个区域,ReservedMemory、UserMemory、ExecutionMemory和StorageMemory。ReservedMemory:固定为300MB,Spark预留的内存区域,用于存放各种Spark内部对象。UserMemory:存放开发者定义的数据结构,例如RDD算子中引用的数组、列表、映射等。执行内存:执行分布式任务。分布式任务的计算主要包括数据转换、过滤、映射、排序、聚合、合并等。StorageMemory:缓存分布式数据集,如RDDCache、广播变量等。整个内存区,ExecutionMemory和StorageMemory是最重要的。1.6版本之后,Spark引入了统一的内存管理模式,两者可以相互转化。7.共享变量Spark提供了两种共享变量,广播变量和累加器。1.广播变量vallist:List[String]=List("Tom","Spark")//sc是SparkContext实例valbc=sc.broadcast(list)广播变量的用法很简单,通过调用SparkContextbroadcast完成广播变量的创建。如果要读取封装后的共享数据内容,调用其bc.value函数即可。好奇的宝宝会问,既然list可以获取字符串列表,为什么还要封装广播变量呢?Answer:Driver端公共共享变量的分配是基于Task粒度的。系统中有多少任务?在网络中分布了多少次,内存资源就存在着巨大的浪费。使用广播变量后,共享变量分布的粒度以Executor为单位,同一个Executor中的多个不同Task只需要访问同一份数据即可。也就是说,变量在网络中分布和存储的数量从RDD的分区数量减少到集群中的Executor数量。2.累加器累加器也是在Driver端定义的。累加过程就是调用RDD算子中的add函数对累加器进行计数,从而更新累加器的状态。应用程序执行后,开发者在Driver端调用累加器的值函数获取全局计数结果。Spark提供了三种累加器longAccumulator、doubleAccumulator和collectionAccumulator,以满足不同的业务场景。
