Spark是加州大学伯克利分校AMP实验室开源的一个类似于MapReduce的通用并行计算框架,具有MapReduce所具有的分布式计算的优点。但与MapReduce不同的是,Spark使用了更多的内存计算,减少了磁盘读写,性能比MapReduce更高。同时提供更丰富的函数库,更适用于数据挖掘、机器学习等分析算法。在Hadoop生态系统中,Spark主要替代MapReduce进行分布式计算,如下图所示。同时组件SparkSQL可以替代Hive对数据仓库的处理,组件SparkStreaming可以替代Storm对流计算的处理,组件SparkML可以替代Mahout数据挖掘算法库。Spark在Hadoop生态系统中的地位01Spark的运行原理现在,我们不再需要学习MapReduce繁琐的设计和开发,而是直接学习Spark的开发。一方面是因为Spark运行效率比MapReduce高,另一方面是因为Spark函数库丰富,开发效率也比MapReduce高。首先,从运行效率来看,Spark的运行速度是Hadoop的数百倍。为什么会有这么大的差异?关键在于他们的经营原则。Hadoop总是从磁盘读取,而Spark更多的是在内存中计算,如下图所示。Hadoop的操作总是在读写磁盘之前提到的。MapReduce的主要计算过程其实就是反复执行Map和Reduce的过程。但是,在执行每个Map或Reduce过程时,都必须先读取磁盘中的数据,然后进行操作,最后将执行的结果数据写入磁盘。因此,MapReduce的执行过程实际上是一个读取数据、执行Map、写入数据、重读数据、执行Reduce、重写数据的往复过程。这样的设计虽然在海量数据下可以减少内存占用,但是频繁的读写磁盘会消耗大量时间,影响运行效率。相比之下,Spark的执行过程只需要第一次从磁盘读取数据,然后就可以进行一系列的操作。这一系列操作也类似于Map或Reduce操作。但是,每次执行之前,都是从内存中读取数据、进行运算、将执行结果数据写入内存的往复过程,直到执行完最后一个操作。入磁盘。这样,整个执行过程就是对内存的读写。虽然会占用大量的内存资源,但是运行效率会大大提高。Spark框架的运行原理如下图所示。集群部署Spark时,在NameNode节点上部署一个SparkDriver,然后在每个DataNode节点上部署一个Executor。SparkDriver是接收和调度任务的组件,而Executor是分布式执行数据处理的组件。同时,在每个数据处理任务执行之前,数据文件已经通过HDFS分布存储在各个DataNode节点上。因此,每个节点上的Executor都会先通过Reader读取本地磁盘上的数据,然后进行一系列的Transformation操作。每个Transformation操作的输入是一个数据集,在Spark中组织成一个弹性分布式数据集(RDD),从内存中读取,最后的输出也是一个RDD,写入内存。这样整个系列的Transformation操作都在内存中读写,直到最后一个操作Action,再通过Writer写入磁盘。这就是Spark的工作原理。Spark框架运行示意图同时,Spark拥有非常丰富的函数库,很多常用的操作不需要开发者自己编写,直接调用函数库即可。这大大提高了软件开发的效率,可以用更少的代码完成更复杂的处理。在这些丰富的函数库中,Spark将它们分为两种类型:转换(Transfer)和动作(Action)。Transfer的输入是RDD,输出也是RDD,所以其实就是对数据的各种转换操作,是Spark要写的主要程序。同时,RDD也分为普通RDD和名值对RDD两种。普通的RDD是由记录一条一条组成的数据集。从原始文件中读取的数据通常是这种形式。操作普通RDD的主要函数有map、flatMap、filter、distinct、union、intersection、subtract、cartesian等。名值对RDD是k-v存储的数据集,map操作是将将普通的RDD转化为名称-值对RDD。通过名值对RDD,可以对其进行reduceByKey、joinByKey等复杂操作。操作名值对RDD最重要的函数有reduceByKey、groupByKey、combineByKey、mapValues、flatMapValues、keys、values、sortByKey、subtractByKey、join、leftOuterJoin、rightOuterJoin、cogroup等。所有Transfer函数的另一个重要特点是它们在处理RDD数据时不会立即执行,而是延迟到下一个Action。这样执行的效果是,当所有的系列操作都定义好后,就执行一次,然后立即写入磁盘。这样就减少了执行时的等待时间,从而减少了内存的使用时间。Spark中的另一种函数是Action。他们输入RDD,输出一个数据结果,通常是得到数据结果后写入磁盘。根据RDD的不同,Action也分为两种:针对普通RDD的操作,包括collect、count、countByValue、take、top、reduce、fold、aggregate、foreach等;RDD的名值对操作,包括countByKey、collectAsMap、lookup等。02Spark设计与开发Spark设计与开发支持Scala、Python和Java3种语言,其中Scala是它的母语。Spark是用Scala语言实现的,它使用Scala作为它的应用框架,可以和Scala紧密集成。Scala语言是一种类似于Java的函数式编程语言。它还在运行时使用Java虚拟机,可以与Java语言无缝结合,相互调用。同时,由于Scala语言采用了当前流行的函数式编程风格,代码更加精简,编程效率更高。前面讲解的计算词频的代码如下:1valtextFile=sc.textFile("hdfs://...")2valcounts=textFile.flatMap(line=>line.split(""))3.map(word=>(word,1))4.reduceByKey(_+_)5counts.saveAsTextFile("hdfs://...")为了实现这个功能,上面讲解的MapReduce框架需要写一个Mapper类和一个Reducer类,并且它们必须通过驱动程序串联起来才能执行。而在用Scala语言编写的Spark程序中,只需要5行代码就可以实现,大大提高了编程效率。如果这段代码是用Java语言写的,需要这样写:1JavaRDDtextFile=sc.textFile("hdfs://...");2JavaRDDwords=textFile.flatMap(3newFlatMapFunction(){4publicIterablecall(Strings){5returnArrays.asList(s.split(""));}6});7JavaPairRDDpairs=words.mapToPair(8newPairFunction(){9publicTuple2call(Strings){10returnnewTuple2(s,1);}11});12JavaPairRDDcounts=pairs.reduceByKey(13newFunction2)<整数,整数,整数>(){14publicIntegercall(整数,整数){returna+b;}15});16counts.saveAsTextFile("hdfs://...");很明显,用Scala语言编写的Spark程序比Java语言更高效更精简,因而也更容易维护和变更。因此,Scala语言将成为更多大数据开发团队的选择。下图是一个完整的Spark程序,包括初始化操作,比如SparkContext的初始化,命令参数args的读取等,然后从磁盘加载数据,通过Spark函数处理数据,最后保存结果数据写入磁盘。完整的Spark程序03SparkSQL设计开发未来三到五年,整个IT行业的技术架构将发生翻天覆地的变化。数据量暴涨,原有数据库架构下的存储成本会越来越高,查询速度会越来越慢,数据扩容会越来越困难。因此,向大数据技术转型势在必行。大数据改造需要开发人员熟悉Spark/Scala的编程模型、分布式计算的设计原理、大量业务数据的分析处理,还需要开发人员熟悉SQL语句。因此,迫切需要一种技术框架,能够支持开发者使用SQL语句进行编程,然后将SQL语言转换成Spark程序进行计算。这样一来,大数据开发的技术门槛将大大降低,更多的普通Java开发人员将能够参与到大数据开发中来。这样的框架就是SparkSQL+Hive。SparkSQL+Hive的设计思路是将通过各种渠道采集的数据存储在Hadoop大数据平台的Hive数据库中。Hive数据库中的数据实际上是存储在分布式文件系统HDFS中,并将这些数据文件一个一个映射成表,通过SQL语句对数据进行操作。在对Hive数据库中的数据进行操作时,通过SparkSQL读取数据,然后通过SQL语句进行处理,最后将结果数据存储到Hive数据库中。1CREATE[EXTERNAL]TABLE[IFNOTEXISTS]table_name2[(col_namedata_type[COMMENTcol_comment],...)]3[COMMENTtable_comment]4[PARTITIONEDBY(col_namedata_type[COMMENTcol_comment],...)]5[CLUSTEREDBY(col_name,col_name,...))6[SORTEDBY(col_name[ASC|DESC],...)]INTOnum_bucketsBUCKETS]7[ROWFORMATrow_format]8[STOREDASfile_format]9[LOCATIONhdfs_path]首先通过上面的语句在Hive数据库中创建一张表,每张表都会在HDFS上通过HDFS映射成数据库文件和分布式存储。创建表后,Hive数据库表不支持逐条插入数据,也不支持更新和删除数据。数据通过数据文件一次性加载,或者通过insertintoT1select*fromT2这样的语句将查询结果加载到表中。1#从NameNode节点加载数据文件2LOADDATALOCALINPATH'./examples/files/kv1.txt'OVERWRITEINTOTABLEpokes;3#从NameNode节点加载数据文件到分区表4LOADDATALOCALINPATH'./examples/files/kv2.txt'5OVERWRITEINTOTABLEinvitesPARTITION(ds='2008-08-15');6#从HDFS加载数据文件到分区表7LOADDATAINPATH'/user/myname/kv2.txt'OVERWRITE8INTOTABLEinvitesPARTITION(ds='2008-08-15');加载数据后,就可以通过SQL语句查询分析数据:1SELECTa1,a2,a3FROMa_table2LEFTJOIN|RIGHTJOIN|INNERJOIN|SEMIJOINb_table3ONa_table.b=b_table.b4WHEREa_table.a4="xxx"注意这里的连接操作有左连接,右连接而内连接,和半连接(SEMIJOIN),其执行效果类似于in语句或exists语句。有了Hive数据库,可以通过SparkSQL读取数据,然后用SQL语句分析数据:1importorg.apache.spark.sql.{SparkSession,SaveMode}2importjava.text.SimpleDateFormat3objectUDFDemo{4defmain(args:Array[String]):Unit={5valspark=SparkSession6.builder()7.config("spark.sql.warehouse.dir","")8.enableHiveSupport()9.appName("UDFDemo")10.master("local")11.getOrCreate()1213valdateFormat=newSimpleDateFormat("yyyy")14spark.udf.register("getYear",(date:Long)=>dateFormat.format(date).toInt)15valdf=spark.sql("selectgetYear(date_key)year,*frometl_fxdj")16df.write.mode(SaveMode.Overwrite).saveAsTable("dw_dm_fx_fxdj")17}18}这段代码中首先初始化了Spark,然后定义了一个名为getYear的函数,然后使用spark.sql()查询和处理Hive表中的数据。最后通过df.write.mode().saveAsTable()将结果数据写入到另一张Hive表中。其中,在SQL语句执行时,getYear()可以作为SQL语句中的一个函数被调用。采用SparkSQL+Hive方案,在进行大数据改造时,实际上是将原来存储在数据库中的表转换为Hive数据库中的表,将之前的存储过程转换为SparkSQL程序,将存储过程转换为Hive数据库中的表。过去被转换成SparkSQL程序。该函数成为Spark自定义函数。这可以帮助企业更轻松地从传统数据库架构向大数据架构转型。本书节选自《架构真意:企业级应用架构设计方法论与实践》,经出版社授权发布。