Spark在MapReduce的基础上进行了改进。它主要使用内存作为中间计算数据的存储,加快了计算的执行时间,在某些情况下,性能可以提高一百倍。除了速度更快之外,Spark还具有比MapReduce更简单易用的编程模型。Spark的主要编程模型是RDD,即ElasticDataset。RDD上定义了很多常用的大数据计算函数,利用这些函数可以用很少的代码完成更复杂的大数据计算。比如我们在介绍Hiveschema设计的时候,谈到了WordCount的例子。使用Scala语言写在Spark上,代码只需要三行。valtextFile=sc.textFile("hdfs://...")valcounts=textFile.flatMap(line=>line.split("")).map(word=>(word,1)).reduceByKey(_+_)counts.saveAsTextFile("hdfs://...")这个demo的代码含义就不详细介绍了。首先从HDFS中读取数据,构建一个RDDtextFile,然后对这个RDD进行三个操作:一是将输入数据的每一行文本拆分成带空格的词;另一种是对单词进行转换,如:word——>(word,1),生成的结构;三是对同一个Key进行统计,统计方式是对Value求和。最后将RDD计数写入HDFS完成结果输出。Spark编程模型RDD是Spark的核心概念,是ResilientDistributedDatasets的缩写。RDD不仅是Spark面向开发者的编程模型,也是Spark自身架构的核心元素。我们先来认识一下RDD作为Spark编程模型。我们知道,大数据计算是对大规模数据集的一系列数据计算处理。MapReduce将输入数据的计算过程分为两个阶段,Map阶段和Reduce阶段,可以理解为面向过程的大数据计算。我们在用MapReduce编程的时候,想的是如何实现Map和Reduce两个阶段的计算逻辑,map和reduce函数的输入输出是什么。这是我们在学习MapReduce编程时反复强调的。大数据训练,而Spark直接对数据进行编程,将大规模的数据集合抽象成一个RDD对象,然后对这个RDD进行各种计算得到一个新的RDD,不断计算处理,直到得到最终的结果数据。所以Spark可以理解为面向对象的大数据计算。我们在对Spark进行编程时,会思考一个RDD对象需要经过什么样的操作,才能将其转化为另一个RDD对象。我们思考的重点和落脚点都在RDD上。所以在上面WordCount的代码示例中,第二行代码实际上进行了3次RDD转换,每次转换得到一个新的RDD,因为新的RDD可以继续调用RDD转换函数,所以连续写成一行代码。其实可以分成3行valrdd1=textFile.flatMap(line=>line.split(""))valrdd2=rdd1.map(word=>(word,1))valrdd3=rdd2.reduceByKey(_+_)Spark架构的核心RDD上定义了两类函数。一个是转换函数,它返回一个RDD;另一个是动作函数,不再返回RDD。RDD定义了很多转换操作函数,比如计算map(func)、过滤filter(func)、合并数据集union(otherDataset)、根据Key聚合reduceByKey(func,[numPartitions])、连接数据集join(otherDataset,[numPartitions]),分组groupByKey([numPartitions])和十几个函数。RDD是Spark架构的核心元素。与MapReduce一样,Spark也对大数据进行分片计算。Spark分布式计算的数据分片和任务调度都是以RDD为单位进行的,每个RDD分片都分配给一个执行进程进行处理。对RDD的转换操作分为两种。一个转换操作生成的RDD不会有新的分片,比如map,filter。在当前分片中。就像你用map函数对每条数据加1一样,还是得到这样一组数据,只是数值不同而已。事实上,Spark并不会按照代码编写的顺序生成RDD。例如,rdd2=rdd1.map(func)这样的代码不会在物理上生成一个新的RDD。物理上,Spark只会在产生新的RDD分片时产生一个RDD。Spark的这个特性也叫惰性计算。另一个转换操作产生的RDD会产生新的分片,比如reduceByKey,不同分片的同一个Key必须进行聚合操作,从而产生新的RDD分片。因此,你只需要记住Spark应用代码中的RDD和Spark执行过程中产生的物理RDD并不是一一对应的。RDD在Spark中是一个非常灵活的概念,但也很重要,需要仔细理解。Spark的计算阶段与MapReduce相同,Spark也遵循移动计算比移动数据更划算的大数据计算基本原则。但是,相对于MapReduce死板的Map和Reduce分阶段计算,Spark的计算框架更具弹性和灵活性,因而具有更好的运行性能。Spark会根据程序中的转换函数生成一个计算任务执行计划,这个执行计划就是一个DAG。Spark可以在一个作业中完成非常复杂的大数据计算。所谓DAG是有向无环图,意思是不同阶段的依赖关系是有向的,计算过程只能沿着依赖关系的方向执行。在依赖阶段执行完成之前,不能执行依赖阶段。同时,这个依赖不能有循环依赖,否则会变成死循环。下图描述了一个典型的Spark运行DAG的不同阶段。上图中A、C、E是从HDFS加载的RDD,A是通过groupBy组统计转换函数计算得到的RDDB,C是通过map转换函数计算得到RDDD,D和E合并通过unionRDDF经过转换函数计算得到,B和F通过join连接函数计算得到最终合并结果RDDG。因此,可以看出Spark作业调度的核心是DAG。使用DAG,整个应用被划分成阶段,每个阶段的依赖关系也很明确。然后根据每个阶段需要处理的数据量,生成相应的任务集(TaskSet),为每个任务分配一个任务进程进行处理。Spark实现了大数据的分布式计算。具体来说,负责生成和管理Spark应用DAG的组件是DAGScheduler。DAGScheduler根据程序代码生成DAG,然后将程序分发到分布式计算集群,按照计算阶段的先后顺序调度执行。你有没有注意到上面的例子有4个转换函数,但只有3个阶段。那么Spark到底是根据什么来划分计算阶段的呢?显然,并不是RDD上的每个转换函数都会生成一个计算阶段。通过观察上面的DAG图,我们可以从图中看出计算阶段划分的规律。当RDD之间的转换连接线呈现多对多的交叉连接时,就会产生一个新的stage。一个RDD代表一个数据集。图中每个RDD包含多个小块,每个小块代表RDD的一个分片。一个数据集中的多个数据分片需要进行分区,写入到另一个数据集中的不同分片中。我们在MapReduce运行过程中也看到了这种数据分区的交叉传输操作。这就是洗牌过程。Spark还需要通过shuffle对数据进行重新组合,将相同key的数据放在一起,进行聚合、关联等操作。因此,每次shuffle都会产生一个新的计算阶段。这就是计算阶段具有依赖性的原因。它需要的数据来自于之前一个或多个计算阶段产生的数据。它必须等待前面的阶段完成,然后才能洗牌并获取数据。所以你需要记住,划分计算阶段的依据是shuffle,而不是转换函数的类型。想一想,你可能会想,为什么同样的shuffle后Spark效率更高呢?从本质上讲,Spark可以看作是MapReduce计算模型的不同实现。HadoopMapReduce简单粗暴的把大数据的计算按照shuffle的方式分为Map和Reduce两个阶段,然后就搞定了。但是,Spark更加细腻。它把前者Reduce和后者Map连接起来,以一个阶段进行连续计算,形成一个更加优雅高效的计算模型,虽然它的本质还是Map和Reduce。但这种依赖多个计算阶段执行的方案,可以有效减少对HDFS的访问,减少作业的调度执行次数,从而执行得更快。并且不同于HadoopMapReduce在shuffle过程中主要使用磁盘存储数据,Spark首先使用内存进行数据存储,包括RDD数据。除非内存不够用,否则尽量使用内存,这也是Spark比Hadoop性能更高的另一个原因。Spark支持Standalone、Yarn、Mesos、Kubernetes等多种部署方案。几种部署方案的原理是一样的,只是不同组件的角色命名不同,但核心功能和运行流程是相似的。首先,Spark应用在自己的JVM进程中启动,即Driver进程。启动后调用SparkContext初始化执行配置和输入数据。SparkContext启动DAGScheduler构造并执行DAG图,划分为最小的执行单元,即计算任务。然后Driver向ClusterManager申请计算资源,进行DAG的分布式计算。ClusterManager收到请求后,将Driver的主机地址等信息通知给集群中的所有计算节点Worker。Worker收到信息后根据Driver的主机地址与Driver通信并注册,然后根据自身空闲资源通知Driver自己可以承担的任务数量。Driver开始根据DAG图给注册的Worker分配任务。Worker收到任务后,启动Executor进程执行任务。Executor首先检查自己是否有Driver的执行代码,如果没有,则从Driver中下载执行代码,通过Java反射加载,开始执行。Spark性能调优和故障处理关于Spark性能调优,有很多值得讨论的地方。我们很快能想到的就是常规的性能调优,包括优化资源分配、RDD优化、并行度调整等,此外还有算子调优、Shuffle调优、JVM调优。至于排错,我们一般讨论解决Spark数据倾斜的问题。我们一般会聚合原始数据,过滤导致偏斜的key,提高shuffle操作时的reduce并行度。由于本文主要介绍架构设计和原理思路,限于篇幅,详细步骤不再赘述。最近刚刚收集了一份Spark性能调优与故障排除的pdf,里面详细的解释了详细的步骤。Spark生态最后,我们来看看Spark的生态吧!和我们之前介绍过的Hadoop一样,Spark也有自己的生态系统。基于Spark,有支持SQL语句的SparkSQL、支持流计算的SparkStreaming、支持机器学习的MLlib、支持图计算的GraphX。通过这些产品,Spark技术栈支持大数据分析、大数据机器学习等各种大数据应用场景。为了方便大家,下面一一介绍这些组件:SparkSQL:用于操作结构化数据的核心组件。通过SparkSQL可以直接查询Hive、HBase等各种外部数据源中的数据。SparkSQL的一个重要特点是可以统一处理关系表和RDD。在处理结构化数据时,开发者无需编写MapReduce程序,直接使用SQL命令即可完成更复杂的数据查询操作。SparkStreaming:Spark提供的流计算框架,支持高吞吐、容错的实时流数据处理。它的核心原理是将流式数据分解成一系列的短批作业,每个短批作业都可以使用SparkCore进行快速处理。SparkStreaming支持多种数据源,例如Kafka和TCP套接字。MLlib:Spark提供了机器学习功能的算法库,包括分类、回归、聚类、协同过滤等算法,还提供了模型评估、数据导入等附加功能。开发者只需要了解一定的机器学习算法知识就可以实现机器学习的开发,降低学习成本。GraphX:Spark提供的分布式图处理框架,具有图计算和图挖掘算法的API接口以及丰富的函数和算子,极大地方便了分布式图的处理需求,可以在海量数据算法上运行复杂的图。Spark生态系统的各个组成部分紧密相关,可以相互调用。这种设计具有以下显着优点。(1)Spark生态系统中包含的所有库和高级组件都可以受益于Spark核心引擎的改进。(2)无需运行多套独立的软件系统,可以大大降低运行整个系统的资源成本。(3)可以无缝集成各种系统,构建不同处理模型的应用。总结起来,Spark主要有三个特点:RDD的编程模型更简单,DAG切分的多阶段计算过程更快,使用内存存储中间计算结果更高效。这三个特点使得Spark比HadoopMapReduce具有更快的执行速度和更简单的编程实现。此外,从Spark生态系统可以看出,Spark框架支持大数据从内存计算、实时处理到交互式查询,进而发展到图计算和机器学习模块。Spark生态系统广泛的技术层面,一方面向在大数据领域占据最大市场份额的Hadoop发起挑战,另一方面也准备迎接Flink、Kafka等计算框架的挑战,后起之秀,让Spark在大数据领域更好的发展!