当前位置: 首页 > 科技观察

借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

时间:2023-03-13 00:41:34 科技观察

使用hbase-rdd二次开发谈如何在SparkCore之上扩展自己的模块分享在SparkCore之上扩展自己的模块的经验。正文来了~~~hbase-rdd是基于SparkContext搭建的第三方开源模块,用于对Hbase进行增删改查。最新版本是0.7.1。目前rdd操作hbase时,默认会调用隐式方法。implicitdefstringToBytes(s:String):Array[Byte]={Bytes.toBytes(s)}将RDD的key转换为byteb,然后调用Hbase的put(b)方法保存rowkey,然后将RDD的每一行存入数据库。在轨迹图绘制项目的数据计算中,我们考虑了hbase的rowkey的设计,尽量减少rowkey存储的开销。虽然hbase-rdd最后的rowkey默认是字节数组,但是我们希望在这个地方按照自己的方式组装rowkey。使用MD5(imei)+dateTime组成的byte数组作为rowkey。所以默认的hbase-rdd提供的方法不符合我们的存储需求,需要修改源码。在toHbase方法中,有一个convert方法,将RDD中的每一行数据进行转换,并使用RDD中的key生成一个Put(Bytes.toBytes(key))对象,提供存储Hbase的rowkey之后。在convert函数中,其实现已被修改。默认情况下,hbase-rdd使用stringToBytes隐式函数将RDD的String类型key转换为字节数组。这里我们需要修改一下。没有使用stringToBytes隐式方法,而是直接生成字节数据。protecteddefconvert(id:String,values:Map[String,Map[String,A]],put:PutAdder[A])={valstrs=id.split(",")valimei=strs{0}valdateTime=strs{1}valb1=MD5Utils.computeMD5Hash(imei.getBytes())valb2=Bytes.toBytes(dateTime.toLong)valkey=b1.++(b2)valp=newPut(key)//改造varempty=truefor{(family,content)<-values(key,value)<-content}{empty=falseif(StrUtils.isNotEmpty(family)&&StrUtils.isNotEmpty(key)){put(p,family,key,value)}}if(empty)NoneelseSome(newImmutableBytesWritable,p)}这样我们就可以用自己的方式来构建rowkey了。当然,基于这个思路,我们可以使用任何方式构建rowkey。在使用hbase-rdd插件的过程中,我一直在想默认的RDD上是没有toHbase方法的,那为什么引入hbase-rdd包后RDD上还有toHbase方法呢?查看源码后,发现hbase-rdd包中提供了两个隐式方法:implicitdeftoHBaseRDDSimple[A](rdd:RDD[(String,Map[String,A])])(implicitwriter:Writes[A]):HBaseWriteRDDSimple[A]=newHBaseWriteRDDSimple(rdd,pa[A])implicitdeftoHBaseRDDSimpleTS[A](rdd:RDD[(String,Map[String,(A,Long)])])(implicitwriter:Writes[A]):HBaseWriteRDDSimple[(A,Long)]=newHBaseWriteRDDSimple(rdd,pa[A])这两个方法在发现RDD上没有toHbase方法时会自动尝试调用,并尝试从隐式定义中寻找解决方案。经过尝试,发现有隐式方法定义了toHBaseRDDSimple,于是调用隐式方法新建一个HBaseWriteRDDSimple类返回hBaseWriteRDDSimple,而hBaseWriteRDDSimple对象中有一个toHbase方法,所以引入hbase-rdd后,可以发现在没有toHbase方法的RDD上有一个toHbase方法。这一切都归功于Scala强大的隐式转换功能。然后明白了原理,我们能不能基于RDD写自己的模块,就这么干吧!第一步:新建一个TraittraitHaha{implicitdefgaga[A](rdd:RDD[String]):Hehe=newHehe(rdd)}第二步:新建一个hehe类finalclassHehe(rdd:RDD[String]){defwow(tableName:String,family:String):Unit={println("--------------------------------------------")println("tableName:"+tableName+"-family:"+family)println("size:"+rdd.count())rdd.collect().foreach(data=>println(数据))println("--------------------------------------------")}}第三步:新建一个包对象packageobjectteststextendsHaha第四步:新建一个测试类objectTest{defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("Test")valsc=newSparkContext(sparkConf)sc.makeRDD(Seq("one","two","three","four")).wow("taskDataPre","T")}}项目结构图:运行效果图:I希望对大家以后的开发有所帮助,同时借鉴这个案例在SparkCore之上搭建自己的小模块。高校4.20IT充电节(19日至20日2天,免费100节视频课程,视频课程会员40折,非会员30折,套餐20折,微职立减2000元)活动链接:http://edu.51cto.com/activity/lists/id-47.html?wenzhang相关视频教程:【大数据Spark2.x流式数据处理】精通Spark流式数据处理(续)