当前位置: 首页 > 科技观察

如果Flink不能直接将聚合结果写入Kafka怎么办?

时间:2023-03-22 13:59:49 科技观察

投疑?[Flink1.10]-有一种情况是Kafka作为所有系统或者应用之间的桥梁,而这个时候恰恰是upstream需要进行Unbound聚合统计。来自@PyFlink企业用户。示例代码:INSERTINTOkafkaSinkSELECTid,SUM(cnt)FROMcsvSourceGROUPBYid执行这??条SQL,在【Flink1.10】版本会抛出如下异常:又一村重现![Flink-1.10]这个问题是Flink内部的retract机制导致的。在考虑对changelog的全链路支持之前,不可能为像Kafka这样的append-only消息队列添加对retract/upsert的支持。这是为语义完整性做出的决定。但真实的业务场景总是有这样或那样的实际业务需求。业务不关心你语义好不好,业务关心我不改我原来的技术选型。在此基础上,只要你告诉我Sink到Kafka的行为,我会根据你输出的行为适配业务,所以这个时候,就是实用了,不管什么语义都不是语义。。。。.,那这个时候怎么办呢?我们的做法是将Kafka的sink从原来的AppendStreamTableSink改为UpsertStreamTableSink或者RetractStreamTableSink。但是出于性能考虑,我们改成了UpsertStreamTableSink。这个改动不大,但是对于新手来说,还是不愿意自己动手??改代码,所以这里给大家抄一份:KafkaTableSinkBase.javahttps://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.javaKafkaTableSourceSinkFactoryBase.javahttps://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java在你的项目中创建org.apache.flink.streaming.connectors.kafka包并将以上两个类放入包中,用于覆盖官方KafkaConnector中的实现。特别强调:这样的改动会导致写入Kafka的结果不是每个GroupKey只有一个结果,而是可能每个Key有多个结果。这个大家可以自行测试一下:packagecdcimportorg.apache.flink.api.scala._importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimportorg.apache.flink.table.api.scala._/***使用upsert模式测试将数据接收到Kafka。*/objectUpsertKafka{defmain(args:Array[String]):Unit={valsourceData="file:///Users/jincheng.sunjc/work/know_how_know_why/QA/upsertKafka/src/main/scala/cdc/id_cnt_data.csv"valenv=StreamExecutionEnvironment.getExecutionEnvironmentvaltEnv=StreamTableEnvironment.create(env)valsourceDDL="CREATETABLEcsvSource("+"idVARCHAR,"+"cntINT"+")WITH("+"'connector.type'='filesystem',"+"'connector.path'='"+sourceData+"',"+"'format.type'='csv'"+")"valsinkDDL="CREATETABLEkafkaSink("+"idVARCHAR,"+"cntINT"+")WITH("+"'connector.type'='kafka',"+"'connector.version'='0.10',"+"'connector.topic'='test',"+"'connector.properties.zookeeper.connect'='localhost:2181',"+"'connector.properties.bootstrap.servers'='localhost:9092',"+"'connector.properties.group.id'='data_Group',"+"'format.type'='json')"tEnv.sqlUpdate(sourceDDL)tEnv.sqlUpdate(sinkDDL)valsql="INSERTINTOkafkaSink"+"SELECTid,SUM(cnt)FROMcsvSourceGROUPBYid"tEnv.sqlUpdate(sql)env.execute("RetractKafka")}}当然你也可以clone我的git代码【https://github.com/sunjincheng121/know_how_know_why/tree/master/QA/upsertKafka]直观体验由于本系列文章只着重于解决问题,不讨论细节和原理,我将在我的视频课程《Apache 知其然,知其所以然》中介绍原理知识。Flink的锅?。..看到上面的问题,可能有朋友会问,既然知道问题,知道有实际业务需求,为什么Flink不对这种情况进行改进和支持呢?好问题,就这个问题而言,Flink是总计据了解,Flink已经在努力支持这种场景。预计您将在Flink-1.12版本中体验到完整的CDC(变更数据捕获)支持。大家拾柴火焰高,期待你的典型问题……我会无所不知……话够多了……我在友谊村等你……作者介绍孙金城,社区编辑,ApacheFlinkPMC成员,ApacheBeamCommitter,ApacheIoTDBPMC会员,ALC北京会员,Apache神鱼导师,Apache软件基金会会员。专注于技术领域的流计算和时序数据存储。