当前位置: 首页 > 科技观察

SparkGraphx实现图最大团挖掘,伪并行算法

时间:2023-03-21 00:49:14 科技观察

####背景:####sparkgraphx没有提供maximalclique挖掘算法目前的maximalclique算法是一种序列化算法,基于Bron-Kerbosch算法####思路:####sparkgraphx提供连通图的算法。连通图和最大团是无向图中的概念。最大团是连通图的子集。使用sparkgraphx查找连通图。从每个连通图中,使用串行最大团算法进行优化以找到最大团(伪并行化)。对于具有强相关性的图,找到的连通图非常大。这时候序列化的maximalclique算法还是会耗费很长的时间。这里使用剪枝的思想来减少样本数据量,但是对于大图,优化空间有限,期待真正并行化的最大群算法####配置文件:####graph_data_path=hdfs://localhost/graph_dataout_path=hdfs://localhost/cliqueck_path=hdfs://localhost/checkpointnumIter=50次剪枝次数=3最大团算法的顶点数=2最大团算法,1:个人实现2:剪枝后jgraphtpercent=90顶点数为上一次的百分比。如果剪枝后剩下90%的数据,剪枝效率不高。spark.master=localspark.app.name=graphspark.serializer=org.apache.spark.serializer.KryoSerializerspark.yarn.executor.memoryOverhead=20480spark.yarn.driver.memoryOverhead=20480spark.driver.extraJavaOptions=-XX:+UseG1GC-XX:+UseCompressedOops-XX:+DisableExplicitGCXspark.executor+XGC:extraJavaOptions=GX1:+UseCompressedOops-XX:+DisableExplicitGCspark.driver.maxResultSize=10gspark.default.parallelism=60jgrapht####示例数据:####{"来源“:”0","dst":"1"}{"src":"0","dst":"2"}{"src":"0","dst":"3"}{"src":“1”,“dst”:“0”}{“src”:“2”,“dst”:“1”}{“src”:“3”,“dst”:“5”}{“src”:"4","dst":"6"}{"src":"5","dst":"4"}{"src":"6","dst":"5"}{"src":"3","dst":"2"}{"src":"2","dst":"3"}{"src":"6","dst":"4"}{"src":"3","dst":"4"}{"src":"4","dst":"3"}{"src":"2","dst":"6"}{“src”:“6”,“dst”:"2"}{"src":"6","dst":"7"}{"src":"7","dst":"6"}####样图:########输出:####0,1,20,2,33,4,54,5,6####代码实现:####importjava.utilimportjava.util.Propertiesimportorg.apache.spark.broadcast.Broadcastimportorg.apache.spark.graphx.{Edge,Graph}importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{Row,SQLContext}importorg.apache.spark.storage.StorageLevelimportorg.apache.spark.{SparkConf,SparkContext}importorg.jgrapht.alg.BronKerboschCliqueFinderimportorg.jgrapht.graph.{DefaultEdge,SimpleGraph}importscala.collection.JavaConverters._importscala.collection.mutableobjectApplicationTitan{defmain(args:Array[String]){valprop=newProperties()prop.load(getClass.getResourceAsStream("/config.properties"))valgraph_data_path=prop.getProperty("graph_data_path")valout_path=prop.getProperty("out_path")valck_path=prop.getProperty("ck_path")valcount=Integer.parseInt(prop.getProperty("count"))valnumIter=Integer.parseInt(prop.getProperty("numIter"))valalgorithm=Integer.parseInt(prop.getProperty("algorithm"))valpercent=Integer.parseInt(prop.getProperty("percent"))valconf=newSparkConf()try{Runtime.getRuntime.exec("hdfsdfs-rm-r"+out_path)//Runtime.getRuntime.exec("cmd.exe/Crd/s/q"+out_path)}catch{caseex:Exception=>ex.printStackTrace(System.out)}prop.stringPropertyNames().asScala.foreach(s=>{if(s.startsWith("spark")){conf.set(s,prop.getProperty(s))}})conf.registerKryoClasses(Array(getClass))valsc=newSparkContext(conf)sc.setLogLevel("ERROR")sc.setCheckpointDir(ck_path)valsqlc=newSQLContext(sc)尝试{vale_df=sqlc.read//.json(graph_data_path).parquet(graph_data_path)vare_rdd=e_df.mapPartitions(it=>{it.map({caseRow(dst:String,src:String)=>valsrc_long=src.toLongvaldst_long=dst.toLongif(src_longList((x._1,1),(x._2,1))).reduceByKey((x,y)=>x+y).filter(x=>x._2>=count-1).mapPartitions(it=>it.map(x=>x._1))valbc_value=temp.collect().toSetbc=sc.broadcast(bc_value)e_rdd=e_rdd.filter(x=>bc.value.contains(x._1)&&bc.value.contains(x._2))e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)iter+=1if(bc_size!=0&&bc_value.size>=bc_size*percent/100){println("totaliter:"+iter)iter=Int.MaxValue}bc_size=bc_value.size}//构造图valedge:RDD[Edge[Long]]=e_rdd.mapPartitions(it=>it.map(x=>Edge(x._1,x._2)))valgraph=Graph.fromEdges(edge,0,StorageLevel.MEMORY_AND_DISK_SER,StorageLevel.MEMORY_AND_DISK_SER)//连接图valcc=graph.connectedComponents().verticescc.persist(StorageLevel.MEMORY_AND_DISK_SER)cc.join(e_rdd).mapParttions(it=>it.map(x=>((math.random*10).toInt.toString.concat(x._2._1.toString),(x._1,x._2._2)))).aggregateByKey(List[(Long,Long)]())((list,v)=>list:+v,(list1,list2)=>list1::list2).mapPartitions(it=>it.map(x=>(x._1.substring(1),x._2))).aggregateByKey(List[(Long,Long)]())((list1,list2)=>list1:::list2,(list3,list4)=>list3:::list4).filter(x=>x._2.size>=count-1).flatMap(x=>{if(algorithm==1)find(x,count)elsefind2(x,count)}).mapPartitions(it=>{it.map({caseset=>vartemp=""set.asScala.foreach(x=>temp+=x+",")temp.substring(0,temp.length-1)case_=>})})//.coalesce(1).saveAsTextFile(out_path)}catch{caseex:Exception=>ex.printStackTrace(System.out)}sc.stop()}//自己现实的极点大团算法deffind(x:(String,List[(Long,Long)]),count:Int):mutable.Set[util.Set[String]]={println(x._1+"|s|"+x._2.size)println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())valneighbors=newutil.HashMap[String,util.Set[String]]valfinder=newCliqueFinder(neighbors,count)x._2.foreach(r=>{valv1=r._1.toStringvalv2=r._2.toStringif(neighbors.containsKey(v1)){neighbors.get(v1).add(v2)}else{valtemp=newutil.HashSet[String]()temp.add(v2)neighbors.put(v1,temp)}if(neighbors.containsKey(v2)){neighbors.get(v2).add(v1)}else{valtemp=newutil.HashSet[String]()temp.add(v1)neighbors.put(v2,temp)}})println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())finder.findMaxCliques().asScala}//jgrapht中的极大团算法deffind2(x:(String,List[(Long,Long)]),count:Int):Set[util.Set[String]]={println(x._1+"|s|"+x._2.size)println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())valto_clique=newSimpleGraph[String,DefaultEdge](classOf[DefaultEdge])x._2.foreach(r=>{valv1=r._1.toStringvalv2=r._2.toStringto_clique.addVertex(v1)to_clique.addVertex(v2)to_clique.addEdge(v1,v2)})valfinder=newBronKerboschCliqueFinder(to_clique)vallist=finder.getAllMaximalCliques.asScalavarresult=Set[util.Set[String]]()list.foreach(x=>{if(x.size()>=count)result=result+x})println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())result}}//自己现实的极大集团算法importjava.util.*;/***[@author](https://my.oschina.net/arthor)mopspecial@gmail.com*[@date](https://my.oschina.net/u/2504391)2017/7/31*/publicclassCliqueFinder{privateMap>neighbors;privateSetnodes;privateSet>maxCliques=newHashSet<>();privateIntegerminSize;publicCliqueFinder(Map>neighbors,IntegerminSize){this.neighbors=neighbors;this.nodes=neighbors.keySet();this.minSize=minSize;}privatevoidbk3(Setclique,Listcandidates,Listexcluded){if(candidates.isEmpty()&&excluded.isEmpty()){if(!clique.isEmpty()&&clique.size()>=minSize){maxCliques.add(clique);}return;}for(Strings:degeneracy_order(candidates)){Listnew_candidates=newArrayList<>(candidates);new_candidates.retainAll(neighbors.get(s));Listnew_excluded=newArrayList<>(排除);new_excluded.retainAll(neighbors.get(s));SetnextClique=newHashSet<>(clique);nextClique.add(s);bk2(nextClique,new_candidates,new_excluded);candidates.remove(s);excluded.add(s);}}privatevoidbk2(Setclique,Listcandidates,Listexcluded){if(candidates.isEmpty()&&excluded.isEmpty()){if(!clique.isEmpty()&&clique.size()>=minSize){maxCliques.add(clique);}return;}Stringpivot=pick_random(candidates);if(pivot==null){pivot=pick_random(excluded);}列表tempc=newArrayList<>(候选人);tempc.removeAll(neighbors.get(pivot));for(Strings:tempc){Listnew_candidates=newArrayList<>(候选人);new_candidates.retainAll(邻居。得到(s));Listnew_excluded=newArrayList<>(排除);new_excluded.retainAll(neighbors.get(s));SetnextClique=newHashSet<>(clique);nextClique.add(s);bk2(nextClique,new_candidates,new_excluded);candidates.remove(s);excluded.add(s);}}privateListdegeneracy_order(ListinnerNodes){Listresult=newArrayList<>();Mapdeg=newHashMap<>();for(Stringnode:innerNodes){deg.put(node,neighbors.get(node).size());}while(!deg.isEmpty()){Integermin=Collections.min(deg.values());StringminKey=null;for(Stringkey:deg.keySet()){if(deg.get(key).equals(min)){minKey=key;break;}}result.add(minKey);deg.remove(minKey);for(Stringk:neighbors.get(minKey)){if(deg.containsKey(k)){deg.put(k,deg.get(k)-1);}}}returnresult;}privateStringpick_random(Listrandom){if(random!=null&&!random.isEmpty()){returnrandom.get(0);}else{returnnull;}}publicSet>findMaxCliques(){this.bk3(newHashSet<>(),newArrayList<>(nodes),newArrayList<>());returnmaxCliques;}publicstaticvoidmain(String[]args){Map>邻居=newHashMap<>();neighbors.put("0",newHashSet<>(Arrays.asList("1","2","3")));neighbors.put("1",newHashSet<>(Arrays.asList("0","2")));neighbors.put("2",newHashSet<>(Arrays.asList("0","1","3","6")));neighbors.put("3",newHashSet<>(Arrays.asList("0","2","4","5")));neighbors.put("4",newHashSet<>(Arrays.asList("3","5","6")));neighbors.put("5",newHashSet<>(Arrays.asList("3","4","6")));neighbors.put("6",newHashSet<>(Arrays.asList("2","4""5"));neighbors.put("7",newHashSet<>(Arrays.asList("6")));CliqueFinderfinder=newCliqueFinder(neighbors,3);finder.bk3(newHashSet<>(),newArrayList<>(neighbors.keySet()),newArrayList<>());System.out.println(finder.maxCliques);}}