本文转载自微信公众号《大数据羊说》,作者反将军。转载本文请联系大数据杨烁公众号。1.前言通过本文,你可以了解到踩坑的场景——这个坑是个什么样的问题?文章-是否有任何机制可以完全避免这种情况?首先我们做个总结:在非window类型的flinksql任务中,会有retract机制,即上游会向下游发送一个“retractmessage(减法)”,**最新的resultMessage(do另外)**两条消息计算结果,保证结果的正确性。而如果我们使用上下游中间的映射类udf去改变**withdrawalmessage(做减法)的一些字段值,可能会导致“withdrawalmessage(做减法)**不被处理2.踩坑场景——这个坑是个什么样子?在介绍这个坑之前,先介绍一下我们的需求和实现方案的背景。2.1.背景在各类游戏中,会有一个场景,一个用户可以从A级升级到B级,用户可以不断升级,但是一个用户在同一时间只会处于同一个级别,需求指标是每个级别的用户数在当前分钟。2.2.预期效果22.3。获取所有当前用户的最新级别的解决方案。一个用户一次只能在一个级别,所以统计每个级别的用户。2.4.获取当前所有用户最新级别的解决方案:flinksqlrow_number()可以实现,根据数据rowtime的倒序,可以获取用户当前最新级别。统计每个级别的用户数:统计row_number()2.4.1后的详细结果。sql的具体实现如下,很简单:WITHdetail_tmpAS(SELECTlevel,id,`timestamp`FROM(SELECTlevel,id,`timestamp`,--row_number获取最新状态row_number()over(PARTITIONbyidORDERBY`timestamp`DESC)ASrnFROMsource_db.source_table)WHERErn=1)SELECTDIM.Chinesegradeasgrade,sum(part_uv)asuvFROM(SELECTgrade,count(id)aspart_uvFROMdetail_tmpGROUPBYgrade,mod(id,1024))--上游数据的grade名称为anumber,需求方需要转换成中文,所以这里添加了一个udf映射LEFTJOINLATERALTABLE(levelChinesemapping_UDF(level))ASDIM(Chineselevel)ONTRUEGROUPBYDIM。中文水平2.4.2。参数配置采用minibatch参数方式控制数据输出频率。table.exec.mini-batch.enabled:true--设置触发间隔为60stable.exec.mini-batch.allow-latency:60stable.exec.mini-batch.size:10000000000任务计划。12.5。问题场景这条SQL运行了n年没有任何问题,但是有一天操作员在配置【关卡中文映射_UDF】时,不小心把某个关卡的中文名称映射错了。虽然马上就恢复了,但是当天把实时数据和离线数据对比后发现,实时输出值比离线数据大很多!!!而且之前是一致的。3.排错篇——坑的排查过程首先我们来思考一下。这个指标算作uv。该操作错误配置了关卡的中文名称。它还应将原始级别的最终结果计为较少。怎么可能更多????然后我们重现了场景,我们看一下代码:任务代码,可以直接复制到本地运行:publicclassTest{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettingssettings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env,settings);//模拟输入DataStream>tuple3DataStream=env.fromCollection(Arrays.asList(Tuple3.of("2",1L,1627218000000L),Tuple3.of("2",101L,1627218000000L+6000L),Tuple3.of("2",201L,1627218000000L+7000L),Tuple3.of("2",301L,1627218000000L+7000L)));//桶取模udftEnv.registerFunction("mod",newMod_UDF());//中文映射udftEnv.registerFunction("status_mapper",newStatusMapper_UDF());tEnv.createTemporaryView("source_db.source_table",tuple3DataStream,"status,id,timestamp");Stringsql="WITHdetail_tmpAS(\n"+"SELECT\n"+"status,\n"+"id,\n"+"`timestamp`\n"+"FROM\n"+"(\n"+"SELECT\n"+"status,\n"+"id,\n"+"`timestamp`,\n"+"row_number()over(\n"+"PARTITIONbyid\n"+"ORDERBY\n"+"`timestamp`DESC\n"+")ASrn\n"+"FROMsource_db.source_table"+")\n"+"WHERE\n"+"rn=1\n"+")\n"+"SELECT\n"+"DIM.status_newasstatus,\n"+"sum(part_uv)asuv\n"+"FROM\n"+"(\n"+"SELECT\n"+"status,\n"+"count(distinctid)aspart_uv\n"+"FROM\n"+"detail_tmp\n"+"GROUPBY\n"+"status,\n"+"mod(id,100)\n"+")\n"+"LEFTJOINLATERALTABLE(status_mapper(status))ASDIM(status_new)ONTRUE\n"+"GROUPBY\n"+"DIM.status_new";Tableresult=tEnv.sqlQuery(sql);tEnv.toRetractStream(result,Row.class).print();env.execute();}}UDF代码:publicclassStatusMapper_UDFextendsTableFunction{publicvoideval(Stringstatus){if(status.equals("1")){collector.collect("level1");}elseif(status.equals("2")){collector.collect("Level2");}elseif(status.equals("3")){collector.collect("Level3");}}}在正确的情况下输出(模拟UDF没有任何变化):(true,level2,1)(假,2,1级)(真,2,2级)(假,2,2级)(真,2,3级)(假,2,3级)(真,2级,4)最终level2的uv数是4,结果是复合预期的?用户在模拟下修改udf配置后,UDF代码如下:publicclassStatusMapper_UDFextendsTableFunction{privateinti=0;publicvoideval(Stringstatus){if(i==5){collect("Level4");}else{if("1".equals(status)){collector.collect("level1");}elseif("2".equals(status)){collector.collect("level2");}elseif("3".equals(status)){collector.collect("Level3");}}i++;}}结果如下:(true,level2,1)(false,level2,1)(true,level2,2)(false,level2,2)(true,level2,3)(false,level2,3)(true,level2,7)最后level2的uv数是7,显然这是一个错误的结果?因此,可以确定是由于该UDF的处理逻辑转换导致结果错误。下面我们来分析一下原因。问题原理分析——导致问题的机制是什么?我们先来分析一下上面的SQL。我们可以发现整个flinksql任务是使用unbounded+minibatch实现的。当minibatch触发条件被触发时,上游算子会撤回之前的结果,并发出最新的结果。该任务的执行计划如图所示。7从算子图中的一些计算逻辑可以看出,整个任务都是基于retract机制(count_retract、sum_retract等)。udf相关的核心逻辑在Operator(ID=7)和Operator(ID=12)之间。当Operator(ID=7)GroupAggregate的结果发生变化时,会发送一条“提款消息(做减法)”,并向Operator(ID=12)GroupAggregate发送一条**最新结果消息(做加法)**.5注意事项:简单说明上面提到的“提现消息(减法)”和“最新结果消息(加法)”。举个count计算的例子:当整个task的第一条数据来的时候,之前没有数据,所以不用withdraw,结果是0(没有数据)+1(第一条数据)=1(result),当第二个results过来后,上次发送的消息1(可以理解为整个task的一个中间结果)应该被撤回,发送最新的result2。那么计算方式就是1(最后一个结果)-1(提款)+2(当前最新结果消息)=2(结果)。从算子图中可以发现【中文名映射】UDF是在两个GroupAggregate之间。也就是说Operator(ID=7)GroupAggregate发送的“撤回消息(减法)”,**最新的结果消息(做加法)”会执行这个UDF,那么就可以“撤回消息(做加法)”subtraction)”将下游GroupAggregate算子key的某个字段改成其他值,那么这条消息就不会发送到原下游GroupAggregate算子的原key上了,原key的历史结果也不会被撤消。但是如果“最新结果消息(加法)**”字段没有改变,那么这条消息仍然会发送给下游的GroupAggregate算子,结果是加法而不是减法,从而导致增加结果,如下图。我们从这个角度来分析上面的案例,对从内层向外层发送的消息一一分析。怎么看内文?其实把上面SQL中的leftjoin删掉,再运行一下就可以得到结果了。结果如下:(true,2,1级)(false,2,1级)(true,2级,2)(false,2级,2)(true,2级,3)(false,级4,3)(true,level2,4)内部消息发送后分析对应的外部消息操作:InnerOuter(true,Level2,1)(true,Level2,1)(false,Level2,1)(假,2,1级)(真,2,2级)(真,2,2级)(假,2,2级)(假,2,2级)(真,2,3级)(true,level2,3)前五个消息不会导致错误,无需详细说明。InnerOuter(true,Level2,1)(true,Level2,1)(false,Level2,1)(false,Level2,1)(true,Level2,2)(true,Level2,2)(false,level2,2)(false,level2,2)(true,level2,3)(true,level2,3)(false,level4,3)第六个消息发送后,afterudf处理后,中文名映射到[Level4],通过hash分区策略向下发送消息时,这条提现消息无法发送给原key为[Level2]的算子,无法处理此撤回消息。InnerOuter(true,Level2,1)(true,Level2,1)(false,Level2,1)(false,Level2,1)(true,Level2,2)(true,Level2,2)(假,2,2级)(假,2,2级)(真,2,3级)(真,2,3级)(假,4,3级)(真,2,4级)(假,level2,3)(true,level2,7)第7条消息(true,level2,4)发送后,外层GroupAggregate算子会先撤回上次发送的demerit,即(false,Level2,3),然后将(true,level2,4)加到当前的demerit上,即3(上次结果)+4(本次最新结果)=7(结果)。这就导致了上面的错误结果。定位到问题的原因后,我们再来看看如何避免出现上述错误。6.避免陷阱——如何避免这类问题6.1。从源头上避免udf。这个映射维度的udf在上线前尽量固定,避免后续改动导致数据错误。6.2.替换为用于映射的ScalarFunctionWITHdetail_tmpAS(SELECTstatus,id,`timestamp`FROM(SELECTstatus,id,`timestamp`,row_number()over(PARTITIONbyidORDERBY`timestamp`DESC)ASrnFROM(SELECTstatus,id,`timestamp`FROMsource_db.source_table)t1)t2WHERErn=1)SELECT--中文名称映射级别中文映射这里_UDF(status)asstatus,sum(part_uv)asuvFROM(SELECTstatus,count(distinctid)aspart_uvFROMdetail_tmpGROUPBYstatus,mod(id,100))GROUPBYstatus还是刚才的逻辑,菜谱刚刚现在,让我们先来看看结果。publicclassStatusMapper_UDFextendsScalarFunction{privateinti=0;publicStringeval(Stringstatus){if(i==5){i++;return"level4";}else{i++;if("1".equals(status)){return"level1";}elseif("2".equals(status)){return"Level2";}elseif("3".equals(status)){return"Level3";}}return"unknown";}}仍然找到撤回数据会出现(false,level4,3)等错误(这个是udf决定的,无法避免),但是我们可以发现最后的结果是(true,level2,4),结果仍然是正确的。大家分析一下,问一下这样可以解决什么问题,如方案所示。6发现映射udf算子的位置不再在两个GroupAggregrates之间,所以retract消息发送后,不会映射到错误的key,所以所有retract消息都会正常处理。7.前景——是否有任何机制可以完全避免这种情况?可以将“撤回消息(减法)”和**最新的结果消息(加法)**做成原子消息从上游到下游,下游统一进行原子处理,关联udf时,只关联组密钥一次。