当前位置: 首页 > 科技观察

MapReduceConnection-ReplicationConnection

时间:2023-03-12 05:41:27 科技观察

如图4.5所示,MapReduce复制连接的工作原理如下:使用分布式缓存(Distributedcache)将这个小数据集复制到所有运行map任务的节点。使用每个映射任务初始化方法将这个小数据集加载到哈希表中。利用大数据集中的记录,逐条遍历哈希表,逐条判断是否满足连接条件。输出满足连接条件的结果。复制连接的实现非常简单。更多详情,请参考《HadoopinAction》。附录D提供了实现复制连接的通用框架。该框架支持任何类型的InputFormat和OutputFormat数据。(我们将在下一个技术中使用这个框架。)复制连接框架根据内存占用的大小动态地决定从分布式缓存的内容和输入拆分中缓存哪些对象。如果所有输入数据集都不够小,无法放入缓存,有没有办法优化地图端连接?然后是时候看看半连接了。AppendixD.2ReplicationConnectionFramework一个replicationconnection是一个map端连接,因其实现而得名:连接中最小的数据集将被复制到所有mapmaster节点。复制连接的实现非常简单。更具体的内容可以参考ChunkLam的《HadoopinAction》。本节的目标是创建一个通用的复制连接框架,可以支持任何类型的数据集。这个框架提供了一个优化的小功能:动态监控分布式缓存内容和输入块的大小,判断哪个大。如果输入块很小,那么就需要把map的输入块放在内存缓冲区中,然后在map的cleanup方法中进行连接操作。图D.4是这个框架的类图,这里提供了连接类(GenericReplicatedJoin)的具体实现,而不仅仅是一个抽象类。在此框架之外,此类将与KeyValueTextInputFormat和TextOutputFormat协作。它的一个假设是每个数据文件的第一个标记是连接密钥。此外,还可以通过继承扩展连接类,支持任意类型的输入输出。图D.5是链接帧的算法。Map的设置方式决定了map的输入块和分布式缓存中哪个更大。如果分布式缓存的内容少,就会加载到内存缓存中。然后在Map函数中开始连接操作。如果输入块比较小,map函数将输入块的key\value对加载到内存缓存中。Map的cleanup方法会从分布式缓存中读取记录,对每条记录和内存缓存中的key\value对进行join操作。以下代码是GenericReplicatedJoin类中的设置方法。它在地图的初始化阶段被调用。此方法确定哪个更大,分布式缓存中的文件或输入块。如果文件比较小,将文件加载到HashMap中。@Overrideprotectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{distributedCacheFiles=DistributedCache.getLocalCacheFiles(context.getConfiguration());intdistCacheSizes=0;for(PathdistFile:distributedCacheFiles){FiledistributedCacheFile=newFile(distFile.toString());distCacheSizes+=distributedCacheFile.length();}if(context.getInputSplit()instanceofFileSplit){FileSplitsplit=(FileSplit)context.getInputSplit();longinputSplitSize=split.getLength();distributedCacheIsSmaller=(distCacheSizes)reader){addToCache(p);}reader.close();}}}根据setup方法是否将分布式缓存的内容加载到内存缓存中,Map方法会有不同的行为。如果将分布式缓存中的内容加载到内存中,那么map方法会将输入块的记录与内存中的缓存进行join。如果分布式缓存中的内容还没有加载到内存中,那么map方法将输入块的记录加载到内存中,然后在cleanup方法中使用。@Overrideprotectedvoidmap(Objectkey,Objectvalue,Contextcontext)throwsIOException,InterruptedException{Pairpair=readFromInputFormat(key,value);if(distributedCacheIsSmaller){joinAndCollect(pair,context);}else{addToCache(pair);}}publicvoidjoinAndCollect(Pairp,Contextcontext)throwsIOException,InterruptedException{Listcached=cachedRecords.get(p.getKey());if(cached!=null){for(Paircp:cached){Pairresult;if(distributedCacheIsSmaller){result=join(p,cp);}else{result=join(cp,p);}if(result!=null){context.write(result.getKey(),result.getData());}}}}publicPairjoin(PairinputSplitPair,PairdistCachePair){StringBuildersb=newStringBuilder();if(inputSplitPair.getData()!=null){sb.append(inputSplitPair.getData());}sb.append("\t");if(distCachePair.getData()!=null){sb.append(distCachePair.getData());}returnnewPair(newText(inputSplitPair.getKey().toString()),newText(sb.toString()));}当所有的记录都被传给map方法后,MapReduce将调用cleanup方法。如果分布式缓存中的内容大于输入块,则连接将在清理中执行。连通对象是输入块在map函数缓存中的记录和分布式缓存中的记录。@Overrideprotectedvoidcleanup(Contextcontext)throwsIOException,InterruptedException{if(!distributedCacheIsSmaller){for(PathdistFile:distributedCacheFiles){FiledistributedCacheFile=newFile(distFile.toString());DistributedCacheFileReaderreader=getDistributedCache.DistributedCache:Readerfor(airfile();读卡器(可迭代)reader){joinAndCollect(p,context);}reader.close();}}}***,job的驱动代码必须指定需要加载到分布式缓存中的文件。下面的代码可以处理一个文件或一个目录的MapReduce输入结果。配置conf=newConfiguration();FileSystemfs=smallFilePath.getFileSystem(conf);FileStatussmallFilePathStatus=fs.getFileStatus(smallFilePath);if(smallFilePathStatus.isDir()){for(FileStatusf:fs.listStatus(smallFilePath)){if(f.getPath().getName().startsWith("part")){DistributedCache.addCacheFile(f.getPath().toUri(),conf);}}}else{DistributedCache.addCacheFile(smallFilePath.toUri(),conf);这个框架假设分布式缓冲区的内容和输入块的内容都可以加载到内存中。它的优点是将两个数据集中较小的一个加载到内存中。在论文《A Comparison of Join Algorithms for Log Processing in MapReduce》中,这种方法针对分布式缓存中内容较大的场景进行了比较优化。在他们的优化中,他们将分布式缓存分成N个分区,并将输入块放入N个哈希表中。那么cleanup方法中的优化就更高效了。映射端复制连接的问题是映射任务必须在启动时读取分布式缓存。上述论文中提到的另一种优化方案是重载FileInputFormat的拆分。将存在于同一主机上的输入块合并为一个块。那么就可以减少需要加载分布式缓存的map任务的数量。***请注意,Hadoop在org.apache.hadoop.mapred.join包中附带了一个映射端连接。但它需要连接数据集的有序输入文件,并要求它们分布到相同的分区中。这导致繁重的预处理工作。原文链接:http://www.cnblogs.com/datacloud/p/3579333.html