当前位置: 首页 > 科技观察

FlinkstateserializationJavaenumactuallysplit

时间:2023-03-22 14:10:25 科技观察

1.Preface-firsttheconclusion本文主要记录博主在针对javaenumserde时在生产环境踩过的flink坑。结论:在flink程序中,如果state中存有javaenum,在enum中增加或删除一个枚举值可能会导致状态恢复异常。这里的异常在恢复过程中可能并没有真正抛出。exception,反而有可能是enumA的值恢复到enumB。我从后面的章节来解释和解决这个问题,希望能对大家有所启发,给大家带来一些启发。踩坑场景——这个坑是什么问题?对任务进行简单的过滤条件修改是什么感觉?task再次上线后,从flinkwebui确认成功从savepoint重启,但实际最终输出的数据似乎并没有从savepoint重启。逻辑是计算子维度中当天的累计pv。代码很简单,稍后贴出。如下图所示:00:04重启时,当天的累计pv出现从零开始累计。但预期的正常曲线应该是这样的。使用DataStream编写任务(基于flink1.13.1)。publicclassSenerioTest{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.addSource(newSourceFunction(){privatevolatilebooleanisCancel=false;@Overridepublicvoidrun(SourceContextctx)throwsException{//数据源}@Overridepublicvoidcancel(){this.isCancel=true;}}).keyBy(newKeySelector(){@OverridepublicLonggetKey(SourceModelvalue)throwsException{returnvalue.getUserId()%1000;}}).timeWindow(Time.minutes(1)).aggregate(newAggregateFunction,Long>,Map,Long>>(){@OverridepublicMap,Long>createAccumulator(){returnnewHashMap<>();}@OverridepublicMap,Long>add(SourceModelvalue,Map,Long>累加器){Lists.newArrayList(Tuple2.of(DimNameEnum.province,value.getProvince()),Tuple2.of(DimNameEnum.age,value.getAge()),Tuple2.of(DimNameEnum.sex,value.getSex())).forEach(t->{Longl=accumulator.get(t);if(null==l){累加器。put(t,1L);}else{accumulator.put(t,l+1);}});returnaccumulator;}@OverridepublicMap,Long>getResult(Map,Long>累加器){returnaccumulator;}@OverridepublicMap,Long>merge(Map,Long>a,Map,Long>b){returnnull;}},newProcessWindowFunction,Long>,SinkModel,Long,TimeWindow>(){privatetransientValueState,Long>>todayPv;@Overridepublicvoidopen(Configurationparameters)throwsException{super.open(parameters);this.todayPv=getRuntimeContext().getState(newValueStateDescriptor,Long>>("todayPv",TypeInformation.of(newTypeHint,Long>>(){})));}@Overridepublicvoidprocess(LongaLong,Contextcontext,Iterable,Long>>elements,Collectorout)throwsException{//将元素数据合并到todayPv//每天零点清空状态重新累加//然后取出#收集出来}});env.execute();}@Data@BuilderprivatestaticclassSourceModel{privatelonguserId;privateStringprovince;privateStringage;privateStringsex;privatelongtimestamp;}@Data@BuilderprivatestaticclassSinkModel{privateStringdimName;privateStringdimValue;privatelongtimestamp;}enumDimNameEnum{province,age,sex,;}}flinkwebui和tm日志都显示savepoint已经正常恢复。我也想知道是不是flinkwebui显示的内容和实际执行不一致。但是发现任务的ck大小是正常的,符合预期。3.2.老老实实打日志。既然能从savepoint正常恢复,那就用log打status值看看到底发生了什么。如以下代码,在ProcessWindowFunction中添加log日志。this.todayPv.value().forEach(newBiConsumer,Long>(){@Overridepublicvoidaccept(Tuple2k,Longv){log.info("keyvalue:{},value值:{}",k.toString(),v);}});查找到的结果如下:...keyvalue:(uv_type,male),value:1000...查找到的state中存储了DimNameEnum.province,DimNameEnum.age的数据是正确的,但是缺少DimNameEnum.sex,而且(uv_type,male)数据比较多,于是查看代码,发现之前多了一个枚举类型DimNameEnum.uv_type。代码如下:enumDimNameEnum{province,age,uv_type,sex,;}所以我怀疑flink对枚举值的serde不是按照枚举值的名字匹配的,而是按照枚举值的下标匹配的.所以就有了DimNameEnum.uv_type占据了DimNameEnum.sex位置的情况。4、问题原理分析——导致问题的机制是什么?让我们看一下源代码。测试代码如下:publicclassEnumsStateTest{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.IntTypeTimeTypesType;Processingof(StateTestEnums.class));EnumSerializere=(EnumSerializer)t.createSerializer(env.getConfig());DataOutputSerializerd=newDataOutputSerializer(10000);e.serialize(StateTestEnums.A,d);env.execute();}enumStateTestEnums{A,B,C;}}调试结果如下:首先查看对应的TypeInformation和TypeSerializer,发现enum类型的serializer为EnumSerializer,再查看EnumSerializer的serde实现,如图图中:最关键的两个变量:序列化时使用valueToOrdinal,反序列化时使用values,从而印证了上面的说法。flinkenum在序列化时,enumeration值下标用于serde,所以一旦枚举值的顺序改变,或者一个枚举值的增减,其他枚举值的下标就会错位。导致数据错误。5、避坑——如何避免这类问题5.1。枚举方案上面的场景,如果要增加新的枚举值,需要正常恢复状态,正常输出数据。然后就可以在最后添加新的枚举值了,比如下面这样。enumDimNameEnum{province,age,sex,uv_type,//在最后添加;}5.2。还有一种解决非枚举的方法,比如题目,就是不用枚举值,直接用string来vans。6.总结本文主要介绍flink枚举值serde中的坑。在enum中增删枚举值时,可能会造成状态分裂。然后给出了这种情况的原因是enumserde的实现导致的,最后给出了解决方案。本文转载自微信公众号“大数据羊说”,可通过以下二维码关注。转载本文请联系大数据杨烁公众号。