前言Hudi不仅支持Spark和Fink编写Hudi,还支持Java客户端。本文总结了HudiJavaClient的使用方法,主要是代码示例,可以读Hive表,也可以写Hudi表。当然也支持读取其他数据源,比如mysql,读取mysql的历史数据,增量数据写入Hudi。Hudi0.12.0版本支持insert/upsert/delete,目前不支持bulkInsert,目前只支持COW表,支持完整的Hudi写操作,包括回滚、clean、archive等。完整代码已经上传至GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client.其中HoodieJavaWriteClientExample是从Hudi源码中复制过来的,包括insert/upsert/delte/codeexamples。JavaClientHive2Hudi是自己写的代码示例的总结。实现kerberos认证,读取Hive表schema作为写hudi的schema,读取Hive表数据写入hudi表,同步hudi元数据到hive元数据,实现Hive元数据的自动创建。当然也支持读取其他数据源,比如mysql,实现历史和增量写入。与HoodieJavaWriteClientExample相比,JavaClientHive2Hudi增加了很多配置参数,更加贴近实际使用。例如HoodieJavaWriteClientExample的payload为HoodieAvroPayload,只能作为示例。JavaClientHive2Hudi使用DefaultHoodieRecordPayload,支持预合并和历史值比较。关于这一点,可以参考我之前写的文章:HudipreCombinedField总结(二)——源码分析,如果只需要预组合功能,可以使用OverwriteWithLatestAvroPayload,这两个是Spark的默认值SQL和SparkDF,当然如果不需要,也可以支持HoodieAvroPayload,代码这里是根据条件判断需要使用哪个payloadClassName。字符串payloadClassName=shouldOrdering?DefaultHoodieRecordPayload.class.getName():shouldCombine?OverwriteWithLatestAvroPayload.class.getName():HoodieAvroPayload.class.getName();然后使用反射构造payload。其实这里反射的逻辑就是HudiSpark源码中的逻辑。另一个更接近实际使用的原因是我们的项目将HudiJavaClient封装成一个NIFI处理器,然后使用NIFI调度,其性能和稳定性可以满足项目需求,这里的核心逻辑和实际项目的逻辑是差不多。之所以使用Java客户端是历史原因,因为我们还没有安排好Spark和Flink(之前用过NIFI)的开发工具,开发新的开发工具需要时间,所以我们选择了Java客户端,我们目前已经使用ApacheDolphinScheduler作为自己的开发调度工具,以后主要使用Spark/Flink,所以现在总结一下HudiJavaClient的使用和源码,以免遗忘,希望对大家有所帮助。初始化Hudi表JavaClient的代码比较接近源码。initTable主要是根据一些配置信息生成.hoodie元数据路径,生成hoodie.properties元数据文件,持久化保存Hudi的一些配置信息。if(!(fs.exists(path)&&fs.exists(hoodiePath))){//根据Hudi路径是否存在,判断Hudi表是否需要初始化if(Arrays.asList(INSERT_OPERATION,UPSERT_OPERATION).包含(writeOperationType)){HoodieTableMetaClient.withPropertyBuilder()。setTableType(TABLE_TYPE)。setTableName(targetTable)。setPayloadClassName(payloadClassName)。setRecordKeyFields(recordKeyFields)类.getName())。initTable(hadoopConf,tablePath);}elseif(writeOperationType.equals(DELETE_OPERATION)){//删除操作,Hudi表必须已经存在thrownewTableNotFoundException(tablePath);}}兜帽ie.properties#属性保存于2022-10-24T07:40:36.530Z#MonOct2415:40:36CST2022hoodie.table.name=test_hudi_targethoodie.archivelog.folder=archivedhoodie.table.type=COPY_ON_WRITEhoodie.table.version=5hoodie.timeline.layout.version=1hoodie.datasource.write.drop.partition.columns=falsehoodie.table.checksum=1749434190创建HoodieJavaWriteClient首先要创建HoodieWriteConfig,主要是hudi的一些配置,比如Schema,表名,payload,index,clean等一些参数可以自己理解。HoodieWriteConfigcfg=HoodieWriteConfig.newBuilder().withPath(tablePath).withSchema(writeSchema.toString()).withParallelism(2,2).withDeleteParallelism(2).forTable(targetTable).withWritePayLoad(payloadClassName).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM)//.bloomIndexPruneByRanges(false)//1000万总时间提升1分钟.bloomFilterFPP(0.0000000)//1000万总时间提升3分钟.fromProperties(indexProperties).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileLimit).approxRecordSize(recordSizeEstimate).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(150,200).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(100).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(maxFileSize).build()).build();writeClient=newHoodieJavaWriteClient<>EngJavatext(hadoopConf),cfg)startCommit返回commitTime,先执行rollback,然后创建.commit.request,然后将commitTime返回StringnewCommitTime=writeClient.startCommit();generateRecord主要是构造写hudi所需的数据结构,包括HoodieKey和payLoad,其中delete操作只需要HoodieKey。公共静态列表
