1。排查思路当这个问题反馈给我时,已经有同学进行了一轮排查。根据网上搜索,会被告知可能是yarn压力过大,网络可能暂时不稳定等,可以通过增加heartbeat.timeout来缓解这个问题,但是调整后问题并没有解决。另一个语句会告诉你频繁GC的原因。建议调整内存。调整后,确实有一定的效果(减缓出现问题的时间)。这显然与代码有关。因为之前版本同步数据没有问题,所以开始找最近的代码改动。找了几次,没有发现可疑代码。突然觉得头皮有点发麻。于是我让现场的同学换成之前的版本,继续做全量,还是会出现这种现象。这时候我有点怀疑生产环境的特性——比如数据的特性,但是现场的同学告诉我,数据没有什么特别的。于是找了一个现场的HeapDump,丢到分析软件上查看,发现org.apache.flink.streaming.api.functions.sink.filesystem.Bucket对象很多。所以我看了一下Bucket对象的定义:/***Bucket是{@linkStreamingFileSink}输出的目录组织。**
对于{@codeStreamingFileSink}中的每个传入元素,查询用户指定的{@link*BucketAssigner}以查看应将此元素写入哪个存储桶。*/@InternalpublicclassBucket{好家伙。一个目录和一个对象。此时此刻,我对同学说的“数据没有什么特别的”产生了疑惑,但是为了实锤,我又按照代码来了:|--HiveTableSink\--createStreamSink|--StreamingFileSink\--initializeState|--StreamingFileSinkHelper\--constructor|--HadoopPathBasedBulkFormatBuilder\--createBuckets|--Buckets\--onElement\--getOrCreateBucketForBucketId看完代码,我有了一个想法。问了站点同步数据的时间跨度是不是特别大。经现场学员确认,时间跨度超过3年。所以建议减少时间跨度,或者减少分区时间。最终在分满batch后解决了这个问题。2.解决问题后的好奇如果每个目录都会生成一个Bucket,那么你跑一个streamjob,迟早会遇到同样的问题。社区里的大神们肯定早就想到了这么明显的问题,好奇心驱使我去寻找答案——直到看到这段代码:Entry>>activeBucketIt=activeBuckets.entrySet().iterator();LOG.info("子任务{}收到了id={}的检查点的完成通知。",subtaskIndex,checkpointId);while(activeBucketIt.hasNext()){finalBucketbucket=activeBucketIt.next().getValue();bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);if(!bucket.isActive()){//我们已经处理了所有挂起的文件,并且此存储桶的写入器//当前未打开。//因此这个桶目前是不活跃的,我们可以将它从我们的状态中移除。activeBucketIt.remove();notifyBucketInactive(桶);}}}