先知其然,再知其所以然是将用户编写的业务逻辑代码和内置组件组合成一个完整的分布式计算程序,并发运行在Hadoop集群上。MapReduce的优点MapReduce易于编程:只需实现一些接口就可以实现一个分布式程序,并将这个分布式程序分发到大量廉价的PC机上执行。扩展性好:增加机器可以提高计算能力容错性高:所谓容错性是指当系统中的一台机器发生故障时,有一种机制将任务分配给新的机器继续运行。此过程不需要人工干预适用于PB级数据的离线处理:大数据的稳定处理。MapReduce不擅长实时计算:MapReduce不能像Mysql那样在毫秒级或秒级返回结果。不擅长流式计算:流式计算的输入数据是动态的。连续的,但是MR处理的数据一定是静态的,这是设计决定的。它不擅长DAG计算:多个任务之间存在依赖关系,后者的输入依赖于前者的输出。这种活动MR不擅长,读写磁盘太多多次性能下降MapReduce统计字处理默认按照128M分片数据。上图中一共有三类进程:APPMaster:负责整个程序的进程调度和状态协调MapTask:负责整个Map阶段的数据处理流程ReduceTask:负责Reduce的整个数据处理流程阶段MapReduce编程例程我们写的部分基本上分为三个部分:Mapper,Reducer和DriverMap阶段(1)用户自定义父类要被Mapper继承(2)Mapper的输入数据格式为KV对(3)的Mapper中的业务逻辑写在map()方法中【每个KV对调用一次map()】(4)Mapper的输出数据格式也是KV对Reducer阶段(1)自定义Reducer继承其parentclass(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV(3)reduce业务逻辑写在reduce()方法中[每对KV调用一次reduce()]的Driver阶段相当于YARN集群的client等,程序写完后,需要将整个程序提交到YARN集群。HELLOWORLD案例需要统计到统计文件中单词的词频Mapper代码中。公共类WordcountMapper扩展Mapper{Textk=newText();IntWritablev=newIntWritable(1);@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//1获取一行Stringline=value.toString();//2切割String[]words=line.split("");//3输出for(Stringword:words){k.set(word);context.write(k,v);}}}Reducer阶段publicclassWordcountReducerextendsReducer{intsum;IntWritablev=newIntWritable();@Overrideprotectedvoidreduce(Textkey,Iterablevalues,Contextcontext)throwsIOException,InterruptedException{//1累加求和sum=0;for(IntWritablecount:values){sum+=count.get();}//2输出v.set(sum);context.write(key,v);}}Driver驱动类publicclassWordcountDriver{publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterinterruptedException{//输入输出路径需要根据自己电脑上的实际输入输出路径来设置//注意这里是在win下运行,如果放在集群上需要更改路径args=newString[]{"e:/input/inputword","e:/output1"};//1获取配置信息并封装任务Configurationconfiguration=newConfiguration();工作job=Job.getInstance(configuration);//2设置jar加载路径job.setJarByClass(WordcountDriver.class);//3设置map和reduce类job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReducer.class);//4设置地图输出job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5设置减少输出job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//6设置输入和输出路径FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(作业,新路径(args[1]));//7提交booleanresult=job.waitForCompletion(true);系统.exit(结果?0:1);}}