1.Preface-firsttheconclusion本文主要记录博主在针对javaenumserde时在生产环境踩过的flink坑。结论:在flink程序中,如果state中存有javaenum,在enum中增加或删除一个枚举值可能会导致状态恢复异常。这里的异常在恢复过程中可能并没有真正抛出。exception,反而有可能是enumA的值恢复到enumB。我从后面的章节来解释和解决这个问题,希望能对大家有所启发,给大家带来一些启发。踩坑场景——这个坑是什么问题?对任务进行简单的过滤条件修改是什么感觉?task再次上线后,从flinkwebui确认成功从savepoint重启,但实际最终输出的数据似乎并没有从savepoint重启。逻辑是计算子维度中当天的累计pv。代码很简单,稍后贴出。如下图所示:00:04重启时,当天的累计pv出现从零开始累计。但预期的正常曲线应该是这样的。使用DataStream编写任务(基于flink1.13.1)。publicclassSenerioTest{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.addSource(newSourceFunction(){privatevolatilebooleanisCancel=false;@Overridepublicvoidrun(SourceContextctx)throwsException{//数据源}@Overridepublicvoidcancel(){this.isCancel=true;}}).keyBy(newKeySelector(){@OverridepublicLonggetKey(SourceModelvalue)throwsException{returnvalue.getUserId()%1000;}}).timeWindow(Time.minutes(1)).aggregate(newAggregateFunction,Long>,Map,Long>>(){@OverridepublicMap,Long>createAccumulator(){returnnewHashMap<>();}@OverridepublicMap,Long>add(SourceModelvalue,Map,Long>累加器){Lists.newArrayList(Tuple2.of(DimNameEnum.province,value.getProvince()),Tuple2.of(DimNameEnum.age,value.getAge()),Tuple2.of(DimNameEnum.sex,value.getSex())).forEach(t->{Longl=accumulator.get(t);if(null==l){累加器。put(t,1L);}else{accumulator.put(t,l+1);}});returnaccumulator;}@OverridepublicMap,Long>getResult(Map,Long>累加器){returnaccumulator;}@OverridepublicMap,Long>merge(Map,Long>a,Map,Long>b){returnnull;}},newProcessWindowFunction