hadoop概述Hadoop历史Hadoop最早起源于Nutch。Nutch的设计目标是一个网络爬虫引擎,但是随着需要爬取的网页数据量的增加,Nutch遇到了严重的性能扩展问题。在2003年和2004年,Google发表了两篇论文来提供解决这个问题的方法。一种是HDFS的前身GFS,用于海量网页的存储;另一个是分布式计算框架MAPERDUCE。Nutch的创始人根据论文的指导,花了两年时间实现了HDFS和MapReduce代码。并且从Nutch中剥离出来,成为一个独立的项目Hadoop。Hadoop在2008年成为Apache的顶级项目,早期的Hadoop并不是现在大家熟悉的Hadoop分布式开源软件,而是一个指大数据的生态系统,包括很多其他的软件。2010年前后,Hbase、HIVE、Zookeeper等相继离开Hadoop项目,成为Apache的顶级项目。2011年,Hadoop发布2.0,在架构上进行了重大更新,引入了Yarn框架,专注于资源管理,简化了MapReduce的职责。同时,Yarn框架作为一个通用的资源调度和管理模块,支持其他多种编程模型,比如最著名的Spark。由于HADOOP的版本管理复杂,复杂集群的部署、安装和配置需要编写大量的配置文件,然后分发到各个节点,容易出错,效率低下。所以很多公司会在基础的Hadoop商业化后重新分发。目前Hadoop发行版有很多,包括华为发行版、英特尔发行版、Cloudera发行版(CDH)等。Hadoop(2.0)HDFS的组成HDFSNameNode的组成:是整个HDFS集群的管理者,管理HDFS的命名空间管理,管理复制策略,管理数据块在DataNode中的位置映射信息,与client处理来自客户端的读写操作。DataNode:实际文件的存储。存储实际数据块,执行数据块读写操作。DataNode启动后向NameNode注册,每6小时向NameNode报告一次所有块信息。DataNode的心跳每3秒一次,心跳将NameNode的命令返回结果给DataNode,例如:拷贝数据到另一台机器;删除一个数据块。如果NameNode超过10分钟没有收到来自DataNode的心跳,则认为该节点不可用。客户端HDFS提供的工具包是面向开发者的,封装了对HDFS的操作调用。负责文件切分:当一个文件上传到HDFS时,Client负责与NameNode和DataNode交互,将文件切块进行上传。SecondaryNameNode:辅助NameNode来分担它的工作量。定期合并Fsimage和Edits并推送到NameNode。辅助恢复NameNode。不是NameNode的热备。当NameNode挂掉后,不能立即替换NameNode为HDFS读写进程提供服务。HDFS上传文件客户端向NameNode请求上传文件,NameNode进行合规检测并创建相应的目录元数据。并返回是否可以上传。客户端分文件,再次询问NameNode,第一个Block需要上传到哪个DataNode服务器。NameNode返回3个DataNode节点,分别是dn1、dn2、dn3。客户端上传第一个块数据给dn1,dn1收到请求后会继续调用dn2,然后dn2调用dn3完成通信管道的建立。客户端开始上传第一个block到dn1(先从磁盘读取数据,放到本地内存缓存中)。Packet中,dn1收到一个Packet,然后传给dn2,dn2传给dn3。一个block传输完成后,client再次请求NameNode上传第二个block,然后重复步骤1到4。距离待上传数据最近的DataNode接收数据。节点距离:两个节点到它们最近的共同祖先的距离之和。如果Client和HADOOP不在同一个集群中,NameNode会随机选择一个rack上的节点。第二个副本位于另一个机架中的随机节点上,第三个副本位于与第二个副本相同机架中的随机节点上。HDFS读进程客户端向NameNode请求下载文件,NameNode返回文件块所在的DataNode地址。客户端根据节点距离选择一个最近的DataNode服务器,然后开始读取数据。DataNode使用Packet来传输数据。客户端收到一个数据包后,进行校验。验证通过后,将数据包写入目标文件,然后请求第二个数据包。整个过程是一个串行过程。(因为IO本身就是最慢的进程)在读取数据的过程中,如果client端与dn数据节点通信出错,则尝试连接下一个包含该数据块的dn数据节点。失败的dn数据节点会被client记录下来,以后不会再连接了。SecondaryNameNodeNameNode是机器的文件管理,容易造成单点读写性能问题和数据存储安全问题。SecondaryNameNode和Name辅助解决读写性能问题:NameNode数据同时存储在内存和磁盘上Fsimage文件:HDFS文件系统元数据的永久检查点,包含HDFS文件系统序列化信息的所有目录和文件inode。Edits文件:存放HDFS文件系统所有更新操作的路径。文件系统客户端执行的所有写操作都会首先记录在Edits文件中。编辑文件只进行追加操作,效率很高。每当更新元数据或添加元数据时,内存中的元数据都会被修改并附加到Edits。NameNode在启动时,会把Fsimage文件读入内存,并在Edits中加载更新操作,保证内存中的元数据信息是最新的和同步的。长时间向Edits添加数据会导致文件数据过大而降低效率,而且一旦断电,恢复元数据的时间会过长。所以2NN专门用于FsImage和Edits的合并。SecondaryNameNode工作机制NameNode启动后,会创建FsImage和Edits文件。如果不是第一次启动,直接加载FsImage和Edits文件。fsimage_0000000000000000002文件最新的FsImage。edits_inprogress_0000000000000000003正在进行编辑。seen_txid是一个txt文本文件,记录的是最新的edits_inprogress文件末尾的编号。SecondaryNameNode工作于2NN并向NN请求CheckPoint。NN将正在进行的edits文件和最新的fsimage文件复制到2NN并更新seen_txid中的数字,然后重新生成edits文件。2NN将editlog和image文件加载到内存中,合并生成新的image文件fsimage.chkpoint。然后将其复制回NN。NN将fsimage.chkpoint重命名为fsImage以完成翻转。HDFS优点总结1.容错性高:数据自动保存多份,通过增加副本提高容错性。副本丢失后,可以自动恢复。2、适合处理大数据:可处理的数据规模达到GB、TB甚至PB级,文件数可达百万级以上规模。3、可以在廉价机器上构建:通过多副本机制,提高了可靠性。缺点1.不适合低延迟的数据访问。2.不能高效存储大量小文件。大量的小文件会占用大量的NameNode内存来存储文件目录和块信息。同时,小文件的寻址时间会超过读取时间,违反HDFS设计目标3。不支持并发写入,文件随机修改。仅支持数据追加。DataNode和NameNode源码指南阅读代码前的准备:HadoopRpcFrameworkGuideRpcProtocolpublicinterfaceMyInterface{ObjectversionID=1;booleandemo();}RpcproviderpublicclassMyHadoopServerimplementsMyInterface{@Overridepublicbooleandemo(){returnfalse;}publicstaticvoidmain(String[]args){Serverserver=newRPC.Builder(newConfiguration()).setBindAddress("localhost").setPort(8888).setProtocol(MyInterface.class).setInstance(newMyHadoopServer())。建造();服务器.start();}}RpcconsuemrpublicclassMyHadoopClient{publicstaticvoidmain(String[]args)throwsException{MyInterfaceclient=RPC.getProxy(MyInterface.class,MyInterface.versionID,newInetSocketAddress("localhost",8888),newConfiguration());客户端.demo();}}NameNode启动源码启动9870端口服务加载图片文件和编辑日志初始化NN的RPC服务器:用于接收DataNode的RPC请求NN启动资源检测NN的心跳超时判断(启动一个线程判断DataNode是否超时)HDFS默认的DataNode丢包容忍时间不是timeout=2heartbeat.recheck.interval+10dfs.heartbeat.interval(2*5+30)超过这个时间会被认为DataNode超时DataNode启动源码工作流源图MapReduceMapReduce实例需求:有一个300M的存储文本文件统计其总数每个字母的出现次数。要求:[a-p]一个结果文件,[q-z]一个结果文件。实际发现:publicclassWordCountMapperextendsMapper{privateTextoutK=newText();privateIntWritableoutV=newIntWritable(1);@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//1获取一行Stringline=value.toString();//2切割String[]words=line.split("");//3循环写入for(Stringword:words){//封装outkoutK.set(word);//写出context.write(outK,outV);}}}publicclassWordCountReducerextendsReducer{privateIntWritableoutV=newIntWritable();@Overrideprotectedvoidreduce(Textkey,Iterablevalues,Contextcontext)抛出IOException,InterruptedException{intsum=0;//累加for(IntWritablevalue:values){sum+=value.get();}outV.set(sum);//写出context.write(key,outV);}}切片规则确定MapTasks的数量MapReduce抽象数据读入InputFormat。常用的FileInputFormat是文件读取的具体实现。FileInputFormat默认的分片规则是:对待处理的数据文件进行逻辑分片,每128M为一个数据片。一个数据切片交给一个MapTask进行并行处理。分片数量过多,开启过多的MapTask,造成资源浪费。分片数量太少,MapTask阶段处理速度慢。ReduceTasks的数量需要手动指定。在Map阶段,需要通过分区算法为每一个输出数据计算分区。如果ReduceTask=0,表示没有Reduce阶段,输出文件数与Maps数相同。如果ReduceTask=1,则所有输出文件都是一个。ReduceTasks的个数必须大于partition的结果不同值的个数,否则无法消费数据会出现异常。如果ReduceTasks的数量大于partitions的数量,部分reduceTasks会空闲。MapReduce的详细工作流程。Map阶段读取阶段:通过RecordReader从InputFormat片段中读取数据,并将数据一一解析成key/value。map阶段:用户自定义的Mapper.map方法执行。将输入键/值转换为输出键/值。Collect阶段:接收输出的Key/Value并调用分区算法,将输出数据写入对应分区的ringmemorybuffer中。溢出阶段:当内存缓冲区的使用超过一定的阈值时,就会触发溢出写线程。该线程现在在内存中执行快速排序,然后将数据溢出到磁盘。Combine阶段:当所有数据处理完成后,MapTask将所有临时文件合并一次,保证最终只生成一个数据文件。Reduce阶段Copy阶段:ReduceTask从所有MapTask中复制同一个partition的数据,每个ReduceTask负责处理一个partition,互不影响。如果文件大小超过一定阈值,则溢出到磁盘,否则存入内存。合并阶段:ReduceTask将同一分区的所有数据合并到一个大文件中,排序阶段:对合并后的大文件进行合并排序。由于mapTask本身保证了分区中的顺序,ReduceTask只需要对所有数据进行一次归并排序即可。Reduce阶段:执行用户的reduce方法,并将结果写入HDFS。MapReduce的优缺点优点是实现简单,封装性高。扩展性强:可以快速添加机器来扩展其计算能力。容错性高:当一个节点挂掉后,会自动将任务转移到另一个节点运行,中间无需人工参与。适用于PB级以上海量数据的离线处理。缺点是不擅长实时计算。不擅长流式计算:MapReduce的输入数据集是静态的。HeavyIO:每个MapReduce作业的输出都会写入磁盘,会造成大量的磁盘IO。MapTask源码指南MapTask.run方法是MapTask的入口点。读取配置,初始化MapTask生成jobId判断使用的api,选择runNewMapper或者runOldMapper,执行。MapTask执行后,做一些清理工作。runNewMapper实例化默认的inputFormat并计算切片。根据设置的reduceTasks数量实例化输出对象。实例化输入对象。mapper.run(mapperContext)执行一个循环,确认每组kv,执行用户的map逻辑。在map方法中调用了collector.collect方法。将数据写入环形缓冲区。output.close(mapperContext)执行,最后调用MapTask的close方法调用MapTask的flush方法。sortAndSpill在内存中排序并将溢出写入文件。每个分区一个临时文件。区内有序的mergeParts进行合并排序,将多个临时文件合并为一个文件。调用MapTask的close方法,是一个空方法。ReduceTask源码指南:其入口是RececeTask的run方法。首先初始化复制、排序和归约状态机。initialize将outputformat初始化为TextOutputFormat。执行shuffleConsumerPlugin.init(shuffleContext)方法。初始化inMemoryMerger和onDiskMerger。shuffleConsumerPlugin.run();创建一个Fetcher来捕获数据。数据抓取完成后,切换状态为sortmerger.close();执行finanlMerge将内存中的数据与磁盘中的数据合并。切换状态减少。运行新减速器。用户定义的Reduce方法执行。执行用户自定义的reduce方法,调用context.write方法写入数据。最后调用TextOutputFormat的write方法。先写key,再写value,最后写换行符。YarnYarn形式ResourceManager(RM):全局资源的管理器,由两部分组成:一个是可插拔的调度Scheduler,另一个是ApplicationManagerScheduler:一个纯调度器,不负责应用监控ApplicationManager:主要负责接收作业提交请求,为应用分配第一个运行ApplicationMaster的Container,并负责监控ApplicationMaster,在接收到ResourceManager请求失败时重启运行ApplicationMaster的ContainerNodeManager(NM),分配Container到应用的某个任务并与ResourceManager交换信息以保证整个集群的顺利运行。ResourceManager通过收集各个NodeManager的报告信息来跟踪整个集群的健康状态,NodeManager负责监控自身的健康状态。管理每个Container的生命周期管理每个节点上的日志执行一些应用在Yarn上的附加服务,比如MapReduce的shuffle过程Container是Yarn框架的计算单元,具体执行应用任务是一组分配系统资源Memory、cpu、disk、network等。每个应用都是从ApplicationMaster开始的,它本身就是一个容器(0th)。一旦启动,ApplicationMaster会根据任务需求与Resourcemanager协商更多的容器。在运行过程中,它可以动态地释放和申请容器。ApplicationMaster(AM)ApplicationMaster负责与调度器协商合适的容器,跟踪应用程序的状态,监控其进度每个应用程序都有自己的ApplicationMaster,负责与ResourceManager协商资源(容器)并与NodeManager协同执行和监控Task当ApplicationMaster启动时,它会定期向资源管理器发送心跳报告,以确认其健康状况和所需资源。在Yarn执行过程中,客户端程序向ResourceManager提交应用,请求一个ApplicationMaster实例,ResourceManager在响应中给出一个applicationIDResourceManager,找到可以运行一个Container的NodeManager,并启动Container中的ApplicationMaster实例。ApplicationMaster向ResourceManager注册。注册后,客户端可以查询ResourceManager,获取其ApplicationMaster的详细信息。正常运行时,ApplicationMaster向ResourceManager发送resource-request请求,ResourceManager会根据调度策略,将容器资源尽可能最优的分配给ApplicationMaster,并作为对资源请求的响应发送给ApplicationMaster。ApplicationMaster将container-launch-specification信息发送给NodeManager,以在启动时启动Container应用程序的代码。它运行在Container中,通过应用专用协议将运行进度、状态等信息发送给ApplicationMaster。随着作业的执行,ApplicationMaster将心跳和进度信息发送给ResourceManager。在这些心跳信息中,ApplicationMaster还可以请求和释放一些容器。在应用程序运行过程中,提交应用程序的客户端主动与ApplicationMaster通信,获取应用程序的运行状态、进度更新等信息。通信协议也是一个特定于应用程序的协议。在队列中,先按照作业的优先级,再按照到达时间,为各个app分配资源。优点:简单,无需配置缺点:不适合共享集群CapacityScheduler:用于在一个集群中运行多个Application这种情况下,目标是最大化吞吐量和集群利用率CapacityScheduler允许将整个集群的资源划分到多个部分,每个组织使用其中的一部分,即每个组织都有一个专用的队列,每个组织的队列还可以进一步划分为层次结构(HierarchicalQueues),从而允许组织内不同的用户组使用.每个队列指定可以使用的资源范围。在每个队列中,应用程序以先进先出的方式进行调度。当一个队列的资源空闲时,它剩余的资源可以与其他队列共享。Yarn源码参考https://www.cnblogs.com/dan2/...https://www.cnblogs.com/dan2/...https://www.cnblogs.com/dan2/...https://www.cnblogs.com/dan2/...https://www.bilibili.com/vide...)Hadoop的组成(2.0)