官网文档地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query参数read.start-commit增量查询开始时间对于流式读取,如果不指定该值,默认取最新的instantTime,即默认从最新的instantTime开始读流(包括最新的)。对于批量读取,不指定该参数,只指定read.end-commit,可以实现时间穿越的功能,可以查询历史记录。如果不指定read.end-commit增量查询的结束时间,默认读取最新的。record,这个参数一般只适用于批量读取,因为流式读取一般要求是查询所有增量数据s),默认值为60,即一分钟查询范围[BEGIN_INSTANTTIME,END_INSTANTTIME],包括开始时间和结束时间。默认值请参考上面的参数说明。建表版本:Hudi0.9.0Spark2.4.5我这里建表使用的是HudiSparkSQL0.9.0,目的是在项目上模拟用JavaClient和SparkSQL建的Hudi表,验证HudiFlinkSQL是否可用增量查询兼容老版本Hudi表(你没有这种需求,可以使用任何方法正常创建数字)查询Hudi0.13.0-SNAPSHOTFlink1.14.3(增量查询)Spark3.1.2(主要是使用CallProcedures命令查看commit信息)buildtableandcreatenumbers--SparkSQLHudi0.9.0createtablehudi.test_flink_incremental(idint,namestring,pricedouble,tslong,dtstring)usinghudipartitionedby(dt)options(primaryKey='id',preCombineField='ts',type='cow');insertintohudi.test_flink_incrementalvalues(1,'a1',10,1000,'2022-11-25');插入hudi.test_flink_incrementalvalues(2,'a2',20,2000,'2022-11-25');更新hudi.test_flink_incrementalsetname='hudi2_update'whereid=2;insertintohudi.test_flink_incrementalvalues(3,'a3',30,3000,'2022-11-26');insertintohudi.test_flink_incrementalvalues(4,'a4',40,4000,'2022-12-26');使用show_commits查看有哪些commit(这里查询使用Hudi的master,因为0.11.0版本支持show_commits,也可以使用hadoop命令查看.hoodie文件夹下的.commit文件)callshow_commits(table=>'hudi.test_flink_incremental');202221205152736202120212051527272727272272120212051527127122022120212021221222222222222022205152650FLINGbytoblesqlsqlsqlsqlsqlsqlsquritnprogentprogralprogentdobledobledobledinter(ID)('connector'='hudi','path'='hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental');建表时不指定增量查询相关参数。我们在查询的时候动态指定,更加灵活,动态指定参数的方法是在查询语句后添加如下语句/*+options('read.start-commit'='20221205152723','read.end-commit'='20221205152736')*/批量读取FlinkSQL阅读Hudi有两种模式:批量阅读和流式阅读。默认批量读取,先看批量读取增量查询验证是否包含开始时间和默认结束时间select*fromtest_flink_incremental/*+options('read.start-commit'='20221205152723'--开始时间对应totheid=3records)*/结果包括开始时间,如果不指定结束时间,则为最新的数据idnamepricetsdt4a440.04000dt=2022-12-263a330.03000dt=2022默认读取-11-26验证是否包含结束时间select*fromtest_flink_incremental/*+options('read.start-commit'='20221205152712',--开始时间对应id=2的记录read.end-commit'='20221205152723'--结束时间对应id=3的记录)*/结果包含结束时间idnamepricetsdt3a330.03000dt=2022-11-262hudi2_update20.02000dt=2022-11-25验证默认开始时间这种情况是指定结束时间,但不指定开始时间,如果不指定,则读取所有records表的最新版本。select*fromtest_flink_incremental/*+options('read.end-commit'='20221205152712'--结束时间对应id=2的更新记录)*/结果:只查询id=namepricetsdt2对应的记录idend-commithudi2_update20.02000dt=2022-11-25时间穿越(查询历史记录)验证是否可以查询到历史记录,我们更新id为2的名字,更新前的名字是a2,更新后的名字是hudi2_update,我们验证一下是否可以通过FlinkSQL查询Hudi历史记录,预期结果id=2,name=a2select*fromtest_flink_incremental/*+options('read.end-commit'='20221205152702'--结束时间对应id=2的历史记录)*/结果:可以正确查询到历史记录idnamepricetsdt2a220.02000dt=2022-11-25开启流式读取的参数:read.streaming.enabled=true流式读取不需要设置结束时间,因为一般要求读取所有增量data、我们只需要验证开始时间。验证默认开始时间select*fromtest_flink_incremental/*+options('read.streaming.enabled'='true','read.streaming.check-interval'='4')*/Result:从最新开始增量读取instantTime,即默认read.start-commit为最新的instantTimeidnamepricetsdt4a440.04000dt=2022-12-26验证指定开始时间select*fromtest_flink_incremental/*+options('read.streaming.enabled'='true','read.streaming.check-interval'='4','read.start-commit'='20221205152712')*/result:idnamepricetsdt2hudi2_update20.02000dt=2022-11-253a330.03000dt=2022-11-264a440.04000dt=2022-11-26如果想第一次查询所有历史数据,可以把start-commit设置的早点,比如设置到去年:'read.start-commit'='20211205152712'select*fromtest_flink_incremental/*+options('read.streaming.enabled'='true','read.streaming.check-interval'='4','read.start-commit'='20211205152712')*/idnamepricetsdt1a110.01000dt=2022-11-252hudi2_update20.02000dt=2022-11-253a330.03000dt=2022-11-264a440.04000dt=2022-11-26验证流读取的连续性,验证有新的增量数据进来,是否可以继续消费Hudi增量数据,验证数据的准确性和一致性。为了方便验证,我可以使用FlinkSQL增量Stream读取Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据来验证数据的准确性FlinkSQL读写MySQL需要配置jar包,将flink-connector-jdbc_2.12-1.14.3.jar放在lib下即可,下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar首先在MySQL中创建一个Sink表--MySQLCREATETABLE`test_sink`(`id`int(11),`name`textDEFAULTNULL,`price`int(11),`ts`int(11),`dt`textDEFAULTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8;在Flink中创建对应的sinktablecreatetabletest_sink(idint,namestring,pricedouble,tsbigint,dtstring)with('connector'='jdbc','url'='jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8','username'='root','password'='root-123','table-name'='test_sink','sink.buffer-flush.max-rows'='1');然后流增量读取Hudi表SinkMysqlinsertintotest_sinkselect*fromtest_flink_incremental/*+options('read.streaming.enabled'='true','read.streaming.check-interval'='4','read.start-commit'='20221205152712')*/likethis会启动一个长任务,一直处于运行状态。我们可以在yarn-session接口上验证,然后验证MySQL中历史数据的准确性,然后使用SparkSQL向源表插入两条数据——SparkSQLinsertintohudi.test_flink_incrementalvalues(5,'a5',50,5000,'2022-12-07');插入hudi.test_flink_incrementalvalues(6,'a6',60,6000,'2022-12-07');我们增量读取的间隔设置为4s。成功插入数据等待4s后,校验MySQL表中的数据,发现新数据已经成功下沉到MySQL中,并且数据没有重复。最后,验证更新的增量。数据,SparkSQL更新Hudi源表--SparkSQLupdatehudi.test_flink_incrementalsetname='hudi5_update'whereid=5;继续验证结果结果是更新后的增量数据也会插入到MySQL中的sink表中,但是不会更新如果想实现对原始数据更新的效果怎么办?我们需要在MySQL和Flink的sink表中添加主键字段,这两者缺一不可,如下:--MySQLCREATETABLE`test_sink`(`id`int(11),`name`textDEFAULTNULL,`price`int(11),`ts`int(11),`dt`textDEFAULTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;--FlinkSQLcreate表测试t_sink(idintPRIMARYKEYNOTENFORCED,namestring,pricedouble,tsbigint,dtstring)with('connector'='jdbc','url'='jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8','username'='root','password'='root-123','table-name'='test_sink','sink.buffer-flush.最大行数'='1');关闭刚刚启动的长任务,重新执行刚才的insert语句,先运行历史数据,最后验证增量效果--SparkSQLupdatehudi.test_flink_incrementalsetname='hudi6_update'whereid=6;insertintohudi。test_flink_incrementalvalues(7,'a7',70,7000,'2022-12-07');可以看到达到了预期的效果,对id=6进行update操作,对id=7进行insert操作
