当前位置: 首页 > 科技观察

Hadoop生态的MapReduce和Hive简介

时间:2023-03-20 20:54:17 科技观察

1.计算框架Hadoop是一个计算框架。目前常用的大规模数据计算框架大致有五种:仅批处理框架:Apachehadoop。唯一的流处理框架:ApacheStorm、ApacheSamza。混合框架:ApacheSpark、ApacheFlink。其中以Hadoop和Spark最为著名,应用也最为广泛。虽然都被称为大数据框架,但实际层次是不同的。Hadoop是一种分布式数据基础设施,包括计算框架MapReduce、分布式文件系统HDFS、YARN等。而Spark则是专门为处理分布式存储中的大数据而设计的工具。它不存储数据,更像是MapReduce的替代品。在使用场景上,Hadoop主要用于离线数据计算,而Spark则更适合需要精准实时的场景。本文主要介绍Hadoop,不讨论Spark。本文承接了知识库(https://gitlab.aihaisi.com/docs/docs/issues/516)中Hadoop的HDFS,介绍了Hadoop的另一个重要组件MapReduce和Hive。2.MapReduce2.1什么是MapReduce,一个基于Java的并行分布式计算框架。前面提到,HDFS提供了一个基于主从结构的分布式文件系统。基于这种存储服务支持,MapReduce可以实现分发、跟踪、执行等任务,并收集结果。2.2MapReduce的组成MapReduce的主要思想是把一个大的计算拆分成Map(映射)和Reduce(化简)。说到这里,其实在引入Lambda之后,JAVA8也有了map和reduce方法。下面是Java中的一个用法:Listnums=Arrays.asList(1,2,3);ListdoubleNums=nums.stream().map(number->number*2).collect(Collectors.toList());Result:[2,4,6]Optionalsum=nums.stream().reduce(Integer::sum);Result:[6]代码很简单,map负责classification,reduce负责计算。而Hadoop中的MapReduce也有同样的目的。下面结合官方案例WordCount进行分析:publicclassWordCount{//Mapper泛型类,4个参数分别代表输入键,值,输出键,值类型publicstaticclassTokenizerMapperextendsMapper{privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//字符分析StringTokenizeritr=newStringTokenizer(value.toString());while(itr.hasMoreTokens()){//nextToken():返回从当前的字符串positiontothenextdelimiterword.set(itr.nextToken());context.write(word,one);}}}//Reducer也有四个参数publicstaticclassIntSumReducerextendsReducer{privateIntWritableresult=newIntWritable();publicvoidreduce(Textkey,Iterablevalues,Contextcontext)throwsIOException,InterruptedException{intsum=0;//循环values,并记录“words”的个数for(IntWritableval:values){sum+=val.get();}result.set(sum);context.write(key,result);}}在这段代码中,不难看出程序的核心是map函数和reduce函数。MapReduce是由这两者组成的吗?继续阅读。2.3Map和Reduce2.3.1Map在WordCount的情况下,很明显map函数的输入主要是一个Context,这里暂时忽略。是Mapper类的内部抽象类,在一般计算中用不到,可以看作是“上下文”的理解。map函数的计算过程是:提取这行文本中的词,对每个词输出一个2.3.2的Reduce,然后看一下reduce。这里,输入参数Values就是上面提到的很多1组成的集合。Key是具体的“字”字。它的计算过程是:对集合中的1求和,然后用这个和(sum)组成一个词(word)。假设有两个数据块的文本数据需要进行词频统计,MapReduce的计算过程如下图所示:到这里就很容易理解了,毕竟只是一个HelloWorld的例子~,但是整个MapReduce过程中最关键的部分其实是在map和reduce之间。以上面的例子为例:统计同一个词在所有输入数据中出现的次数,一个Map只能处理一部分数据,热词很可能出现在所有的Map中,也就是说同一个词必须将统计信息合并在一起才能获得正确的结果。几乎所有的大数据计算场景都需要处理这种数据关联。如果是一个例子,当然只合并Key是可以的,但是像数据库的join操作就比较复杂了,需要处理两种(或更多)数据根据Key进行关联。这种数据关联操作在MapReduce中称为shuffle。2.4shuffleshuffle字面意思,洗牌。下面是一个完整的MR流程,看看如何shuffle。先看左半边。1、从HDFS读取数据,将数据块输入到各个地图中,地图计算完成后,将计算结果存储到本地文件系统中。当地图即将完成时,将开始洗牌过程。2、如图所示,shuffle也可以分为两种。Map端的是Mapshuffle。大致流程如下:Map任务流程会调用一个Partitioner接口,对于生成的每一个map,在这里实现Map结果的分区、排序、拆分,将同一分区的输出合并写入磁盘以获得有序分区。文档。这样无论Map在哪个服务器节点上,都会将同一个Key发送到同一个Reduce进程。Reduce过程看收到的1的右半部分。Reduceshuffle可以分为两个阶段:复制Map输出和排序合并。Copy:Reduce任务从各个Map任务中拉取数据后,通知父TaskTracker状态已经更新,TaskTracker通知JobTracker。Reduce会周期性的从JobTracker中获取Map的输出位置。一旦获取到位置,Reduce任务会将输出对应的TaskTracker的输出复制到本地,不会等到所有Map任务完成。归并排序:先将数据复制到内存缓冲区,如果缓冲区适合,则将数据写入内存,即内存到内存的归并。Reduce将数据拖到每个Map中,内存中的每个Map对应一个数据。当内存缓冲区中存储的数据达到一定程度时,启用内存中的合并,将内存中的数据合并输出到磁盘文件,即内存到磁盘合并。当属于reduce的mapoutputs全部复制完后,会在reduce上生成多个文件,进行merge操作,即磁盘到磁盘的merge。此时Map的输出数据已经有序,Merge进行归并排序。Reduce端所谓的sort过程就是merge的过程。2.上一步reduceshuffle后,reduce进行最后的计算,并将输出写入HDFS。以上就是shuffle的大致四个步骤。关键是map的混洗输出转到哪个Reduce进程。它由Partitioner实现。MapReduce框架的默认Partitioner将Reduce任务的数量与Key哈希值取模。同一个Key会落在同一个Reduce任务ID中。publicintgetPartition(K2key,V2value,intnumReduceTasks){return(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks;}如果用一句话概括Shuffle:分布式计算将不同服务器的数据合并在一起进行后续计算。Shuffle是大数据计算过程中一个神奇的地方。不管是MapReduce还是Spark,只要是大数据批量计算,就一定有shuffle的过程。数据只有相互关联,才能体现其内在联系和价值。3、Hive在上一节介绍了MapReduce。接下来简单说一下Hive。我认为任何技术的出现都是为了解决某类问题。MapReduce无疑简化了大数据开发的编程难度。但其实更常用的数据计算方式可能是SQL,那么有没有办法直接运行SQL呢?3.1什么是蜂巢?基于Hadoop的数据仓库系统定义了一种类似SQL的查询语言:HiveSQL。这里有一个术语数据仓库,数据仓库是指:面向主题(SubjectOriented)、集成(Integrated)、相对稳定(Non-Volatile)、响应历史变化(TimeVariant)的数据集合,用来支持管理决策。这么说可能有点抽象,我们来分解一下:主题:数据仓库是针对某个主题组织起来的,指的是使用数据仓库进行决策时所关注的重点方面。例如,订阅分析可以看作是一个主题。集成:数据仓库需要将多个数据源的数据存储在一起,但之前数据的存储方式不同,需要抽取、清洗、转换。(即ETL)稳定性:保存的数据是一系列的历史快照,不能修改,只能分析。Time-varying:会周期性接收新的数据,以反映新的数据变化。现在我们再看一下定义:数据仓库就是将多个数据源的数据按照一定的主题进行整合,进行抽取、清洗、转换。而且,加工整合后的数据不允许随意修改,只能进行分析,需要定期更新。3.2为什么是Hive了解了Hive的基本定义后,想一想:一个依赖HDFS的数据仓库在Hadoop环境下能起到什么作用?前面提到了SQL可以直接在Hadoop平台上运行吗,这里的答案就是Hive。它可以将HiveSQL转换成MapReduce程序运行。Hive的初始版本默认为HiveonMapreduce。在启动hive之前,通常需要先启动hdfs和yarn。同时一般需要配置MySQL。Hive依赖于HDFS数据存储。类型,地址等。这些信息存储在一个表中,称为元数据,可以存储在MySQL中。现在看一些Hive创建新数据库的命令:createdatabasexxx;删除数据库:dropdatabasexxx;创建表:createtabletable_name(col_namedata_type);Hive表有两个概念:**内部表和外部表**。默认内表,简单来说就是内表数据存放在每张表对应的HDFS目录下。外部表中的数据存储在别处。删除外部表,不会删除外部表指向的数据,只会删除外部表对应的元数据。查询:select*fromt_table**where**a<100**and**b>1000;连接查询:selecta.*,b.*fromt_aajoint_bbona.name=b.name;看到这里,你可能会觉得我在写SQL,没错,对于熟悉SQL的人来说,Hive是非常好用的。3.3HIVESQL到MapReduce前面提到,HQL可以“转化”为MapReduce,我们来看看HQL是如何转化为MapReduce的Hive基础架构的:通过Client向Hive提交SQL命令。如果是DDL,Hive会通过执行引擎Driver将数据表的信息记录在Metastore元数据组件中。该组件通常用关系型数据库实现,记录了表名、字段名、字段类型、关联的HDFS文件路径等Meta信息。(元信息)。如果是DQL,Driver会将语句提交给自己的编译器进行语法分析、解析、优化等一系列操作,最终生成MapReduce执行计划。然后根据执行计划生成一个MapReduce作业,提交给Hadoop的MapReduce计算框架进行处理。例如输入aselectxxxfroma;执行顺序是:先在metastore查询-->sql分析-->查询优化-->物理计划-->执行MapReduce。小结本文大致介绍了什么是MapReduce,它的组成和基本原理,同时也介绍了Hive。其实在实践中,并不需要经常写MapReduce程序,主要的数据处理还是SQL分析,所以Hive在大数据应用中扮演着非常重要的角色。