当前位置: 首页 > 科技观察

Kafka分区数据倾斜导致Watermark松手怎么办?

时间:2023-03-13 01:20:19 科技观察

投疑?有一个非常...非常...普遍的痛点就是Kafka分区数据倾斜,因为某个分区的数据慢,整个作业无法进行事件驱动计算。来自@孙金城的知识星球用户,如下:比如我们有一个Kafka主题,有2个分区,如下数据:S001,1,2020-06-1309:58:00S001,1,2020-06-1309:58:01S001,2,2020-06-1309:58:02S001,3,2020-06-1309:58:03S001,4,2020-06-1309:58:04S001,5,2020-06-1309:58:05S001,6,2020-06-1309:58:06S001,7,2020-06-1309:58:07S001,8,2020-06-1309:58:08S001,9,2020-06-1309:58:09S001,10,2020-06-1309:58:10S001,11,2020-06-1309:58:11S001,12,2020-06-1309:58:12S001,13,2020-06-1309:58:13S001,14,2020-06-1309:58:14S001,15,2020-06-1309:58:15S001,16,2020-06-1309:58:16S001,17,2020-06-1309:58:17S001,18,2020-06-1309:58:18S001,19,2020-06-1309:58:19S001,20,2020-06-1309:58:20S001,21,2020-06-1309:58:21//这条数据在第一个分区,其他数据在第二个分区。S001,22,2020-06-1309:58:22S001,23,2020-06-1309:58:23S001,24,2020-06-1309:58:2458:25S001,26,2020-06-1309:58:26S001,27,2020-06-1309:58:27S001,28,2020-06-1309:58:28S001,29,2020-06-1309:58:29S001,30,2020-06-1309:58:30S001,31,2020-06-1309:58:31S001,32,2020-06-1309:58:32S001,33,2020-06-1309:58:33S001,34,2020-06-1309:58:34S001,35,2020-06-1309:58:35S001,36,2020-06-1309:58:36S001,37,2020-06-1309:58:37S001,38,2020-06-1309:58:38S001,39,2020-06-1309:58:39我们使用自定义Partitioner的方式将第21条数据转移到第一个分区,其他的在第二个分区。这时候如果业务需求是5秒的窗口。那么目前Flink-1.10默认只能触发4个窗口的计算,也就是从22条数据到39条数据都不会触发计算。使用本文提到的方案可以完成7个窗口(所有窗口)的触发。不考虑Idle情况,计算结果如下:考虑Idle情况,计算结果如下:又一个村庄再现!【Flink1.10】这又是一个你知道1秒却不知道坐以待毙的情况。问题的本质是目前生成Watermark的机制是min(partition1,partition2,...,partitionN),所以出现了桶效应,也就是用户描述的情况。我应该怎么办?修改代码....还是那句话,看这个系列的朋友都在看如何快速解决问题,话不多说,直接看解决步骤:按照下面的代码开发一个`StreamSource`,放到`org.apache.flink.streaming.api.operators`包下,和你的业务代码一起打包:https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/v110/discover-idle-sources/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java注意上面增加了一个配置`idleTimeout`的配置项。这个配置默认为`-1`,也就是不生效,所以只要你配置超过这个值,如果指定时间没有数据,Flink系统就会认为这个Partition没有数据,所以在计算Watermark时不会考虑,有数据时才计入Watermark的计算中。写作业的时候配置`source.idle.timeout.ms`参数,如下:OK,以上两步就可以解决这个问题了。如果遇到classloader问题,我说的是if,那就修改下面的默认值。【说明】以上方案适用于Flink1.10及之前版本DataStream和SQLflinkplanner的开发(我想以后也一样,因为flinkplanner逐渐被blinkplanner取代)。可以为FlinkblinkplannerSQL(1.9+)添加`table.exec.source.idle-timeout`。对于Flink1.11及之后的DataStrem,可以使用`WatermarkStrategy`来设置,最后参考1.11发布后的文档。向前迈出一小步?如果你是已经遇到过这个问题的朋友,那么按照上面两个步骤应该就可以解决问题了。如果你没有遇到过这个问题,想自己体验一下,可以clone我的git:https://github.com/sunjincheng121/know_how_know_why/tree/master/QA/v110/discover-idle-sources把这个项目拉到本地,按照README.md体验一下:https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/v110/discover-idle-sources/src/main/java/qa/README.md如果你在上面如果你在操作中还遇到困难,别着急,关注我的《Apache Flink知其然,知其所以然》视频课程,会有视频演示(本系列文章会尽量简单,只说How,不说Why)Flink的锅?……社区也在不断努力解决这个问题。感兴趣的朋友可以参考FLIP-27&FLIP-126。当然flinkplanner(old)目前只能通过本文提到的方案解决。还建议您尽快升级到blinkplanner。作者介绍孙金城,社区编辑,ApacheFlinkPMC成员,ApacheBeamCommitter,ApacheIoTDBPMC成员,ALCBeijing成员,Apache神鱼导师,Apache软件基金会成员。专注于技术领域的流计算和时序数据存储。