当前位置: 首页 > 科技观察

HudiJavaClient总结-阅读Hive和编写Hudi代码示例

时间:2023-03-15 14:42:35 科技观察

前言Hudi不仅支持Spark和Fink编写Hudi,还支持Java客户端。本文总结了HudiJavaClient的使用方法,主要是代码示例,可以读Hive表,也可以写Hudi表。当然也支持读取其他数据源,比如mysql,读取mysql的历史数据,增量数据写入Hudi。Hudi0.12.0版本支持insert/upsert/delete,目前不支持bulkInsert,目前只支持COW表,支持完整的Hudi写操作,包括回滚、clean、archive等。完整代码已经上传至GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client.其中HoodieJavaWriteClientExample是从Hudi源码中复制过来的,包括insert/upsert/delte/codeexamples。JavaClientHive2Hudi是自己写的代码示例的总结。实现kerberos认证,读取Hive表schema作为写hudi的schema,读取Hive表数据写入hudi表,同步hudi元数据到hive元数据,实现Hive元数据的自动创建。当然也支持读取其他数据源,比如mysql,实现历史和增量写入。与HoodieJavaWriteClientExample相比,JavaClientHive2Hudi增加了很多配置参数,更加贴近实际使用。例如HoodieJavaWriteClientExample的payload为HoodieAvroPayload,只能作为示例。JavaClientHive2Hudi使用DefaultHoodieRecordPayload,支持预合并和历史值比较。关于这一点,可以参考我之前写的文章:HudipreCombinedField总结(二)——源码分析,如果只需要预组合功能,可以使用OverwriteWithLatestAvroPayload,这两个是Spark的默认值SQL和SparkDF,当然如果不需要,也可以支持HoodieAvroPayload,代码这里是根据条件判断需要使用哪个payloadClassName。字符串payloadClassName=shouldOrdering?DefaultHoodieRecordPayload.class.getName():shouldCombine?OverwriteWithLatestAvroPayload.class.getName():HoodieAvroPayload.class.getName();然后使用反射构造payload。其实这里反射的逻辑就是HudiSpark源码中的逻辑。另一个更接近实际使用的原因是我们的项目将HudiJavaClient封装成一个NIFI处理器,然后使用NIFI调度,其性能和稳定性可以满足项目需求,这里的核心逻辑和实际项目的逻辑是差不多。之所以使用Java客户端是历史原因,因为我们还没有安排好Spark和Flink(之前用过NIFI)的开发工具,开发新的开发工具需要时间,所以我们选择了Java客户端,我们目前已经使用ApacheDolphinScheduler作为自己的开发调度工具,以后主要使用Spark/Flink,所以现在总结一下HudiJavaClient的使用和源码,以免遗忘,希望对大家有所帮助。初始化Hudi表JavaClient的代码比较接近源码。initTable主要是根据一些配置信息生成.hoodie元数据路径,生成hoodie.properties元数据文件,持久化保存Hudi的一些配置信息。if(!(fs.exists(path)&&fs.exists(hoodiePath))){//根据Hudi路径是否存在,判断Hudi表是否需要初始化if(Arrays.asList(INSERT_OPERATION,UPSERT_OPERATION).包含(writeOperationType)){HoodieTableMetaClient.withPropertyBuilder()。setTableType(TABLE_TYPE)。setTableName(targetTable)。setPayloadClassName(payloadClassName)。setRecordKeyFields(recordKeyFields)类.getName())。initTable(hadoopConf,tablePath);}elseif(writeOperationType.equals(DELETE_OPERATION)){//删除操作,Hudi表必须已经存在thrownewTableNotFoundException(tablePath);}}兜帽ie.properties#属性保存于2022-10-24T07:40:36.530Z#MonOct2415:40:36CST2022hoodie.table.name=test_hudi_targethoodie.archivelog.folder=archivedhoodie.table.type=COPY_ON_WRITEhoodie.table.version=5hoodie.timeline.layout.version=1hoodie.datasource.write.drop.partition.columns=falsehoodie.table.checksum=1749434190创建HoodieJavaWriteClient首先要创建HoodieWriteConfig,主要是hudi的一些配置,比如Schema,表名,payload,index,clean等一些参数可以自己理解。HoodieWriteConfigcfg=HoodieWriteConfig.newBuilder().withPath(tablePath).withSchema(writeSchema.toString()).withParallelism(2,2).withDeleteParallelism(2).forTable(targetTable).withWritePayLoad(payloadClassName).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM)//.bloomIndexPruneByRanges(false)//1000万总时间提升1分钟.bloomFilterFPP(0.0000000)//1000万总时间提升3分钟.fromProperties(indexProperties).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileLimit).approxRecordSize(recordSizeEstimate).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(150,200).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(100).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(maxFileSize).build()).build();writeClient=newHoodieJavaWriteClient<>EngJavatext(hadoopConf),cfg)startCommit返回commitTime,先执行rollback,然后创建.commit.request,然后将commitTime返回StringnewCommitTime=writeClient.startCommit();generateRecord主要是构造写hudi所需的数据结构,包括HoodieKey和payLoad,其中delete操作只需要HoodieKey。公共静态列表>generateRecord(ResultSetrs,SchemawriteSchema,StringpayloadClassName,booleanshouldCombine)throwsIOException,SQLException{List>list=newArrayList<>();while(rs.next()){GenericRecordrec=newGenericData.Record(writeSchema);writeSchema.getFields().forEach(字段->{try{rec.put(field.name(),convertValueType(rs,field.name(),field.schema().getType()));}catch(SQLExceptione){抛出新的RuntimeException(e);}});字符串partitionPath=partitionFields==n乌尔?"":getRecordPartitionPath(rs,writeSchema);System.out.println(partitionPath);字符串rowKey=recordKeyFields==null&&writeOperationType.equals(INSERT_OPERATION)?UUID.randomUUID().toString():getRecordKey(rs,writeSchema);HoodieKeykey=newHoodieKey(rowKey,partitionPath);如果(shouldCombine){ObjectorderingVal=HoodieAvroUtils.getNestedFieldVal(rec,preCombineField,false,false);list.add(newHoodieAvroRecord<>(key,createPayload(payloadClassName,rec,(Comparable)orderingVal)));}else{list.add(newHoodieAvroRecord<>(key,createPayload(payloadClassName,rec)));}}返回列表;/insert/delete,JavaClient默认也会开启clean等操作。具体实现是HoodieJavaCopyOnWriteTable目前不支持bulkInsert等操作。如果我有能力,我会尝试提交PR支持。writeClient.upsert(记录,newCommitTime);writeClient.insert(记录,newCommitTime);writeClient.delete(记录,newCommitTime);同步Hive最后将元数据同步到Hive,实现hive建表。此步骤是可选的。这样就可以使用HiveSQL和SparkSQL来查询Hudi表了。/***使用HiveSyncTool同步Hive元数据*Spark写Hudi源码同步Hive元数据的方式**@paramproperties*@paramhiveConf*/publicstaticvoidsyncHive(TypedPropertiesproperties,HiveConfhiveConf){HiveSyncToolhiveSyncTool=新HiveSyncTool(属性,hiveConf);hiveSyncTool.syncHoodieTable();}publicstaticHiveConfgetHiveConf(StringhiveSitePath,StringcoreSitePath,StringhdfsSitePath){HiveConf配置=newHiveConf();configuration.addResource(新路径(hiveconfigSitePath));.addResource(新路径(coreSitePath));配置.addResource(新路径(hdfsSitePath));返回配置;}/***同步Hive元数据的一些属性配置*@parambasePath*@return*/publicstaticTypedPropertiesgetHiveSyncProperties(StringbasePath){TypedPropertiesproperties=newTypedProperties();properties.put(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(),嗨veSyncMode.HMS.name());properties.put(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key(),true);properties.put(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(),dbName);properties.put(HoodieSyncConfig.META_BLE_SYNC),targetTable);properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(),basePath);properties.put(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),MultiPartKeysValueExtractor.class.getName());properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(),partitionFields);if(partitionFields!=null&&!partitionFields.isEmpty()){properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(),partitionFields);}返回属性;.0版本,本文代码示例基于0.12.0,核心代码是一样的,有两个区别1、0.9.0的clean和archive的参数都在withCompactionConfig里,现在2、0.9。0HiveSyncTool的参数是HiveSyncConfig,现在是TypedProperties。综上所述,HudiJavaClient和Spark、Flink一样,可以实现Hudi完整的写逻辑,但是目前功能支持还不够完善,比如不支持MOR表,性能不如Spark和弗林克。毕竟Spark和Flink都是集群。不过HudiJavaClient可以集成到其他框架中,比如NIFI,集成起来更方便。集成到NIFI的好处是可以通过拖拽配置参数的方式将历史数据和增量数据写入Hudi。你也可以自己实现多线程来提高性能。我们目前测试的性能是Insert可以达到10000条/s,upsert可能需要重写整个表,因为需要读取索引和更新历史数据。所以,当历史数据比较大,更新率比较高的时候,单线程的性能会很差。不过基于源码改造,将Bloom索引和数据写入部分改成多线程后,性能会提升不少。当然这也要看机器的性能,跟CPU和内存有关。对于数据量不是很大的ZF数据,一般是几十亿表,性能还是可以满足要求的。