当前位置: 首页 > 科技观察

Flink代码是这样写的,居然能触发窗口才奇怪!

时间:2023-03-20 20:51:33 科技观察

1。Preface-ConclusionFirst本文主要记录小萌友在使用DataStreamAPI实现事件时间窗口应用时遇到的窗口不触发问题的坑及排查过程。博主希望大家看完这篇文章一定要养成这个编程习惯:使用DataStreamAPI实现Flink任务时,WatermarkAssigner尽量靠近Source节点,尽量靠前。如果您想问为什么,请继续阅读!我将从后面的章节中对上述问题以及为什么提出这样的建议进行说明,希望能对大家有所启发,也能给大家带来一些启发。?踩坑场景——坑是什么样子的?Scenarios-这个坑是什么样的2.1.需求场景首先介绍一下这个坑对应的一个需求场景和第一版的实现代码。需求:在电商平台中,需要根据网页在线用户的心跳日志(userheartbeatlogs),计算当前一分钟在线用户停留在购物车页面(Shopping-Cart)的数量每30秒报告一次)。数据来源:每30s上报一次的用户心跳日志(user_id、page、time三个字段分别对应用户id、用户页面、日志上报时间)数据处理:先过滤掉购物车,根据timestampWindow(Tumble)聚合计算数据采集:每分钟聚合的结果数据(uv和time这两个字段对应购物车页面当前分钟的并发在线人数,当前分钟的时间戳)FlinkDataStreamAPI具体实现代码如下:publicclassWatermarkTest{publicstaticvoidmain(String[]args)throwsException{//获取Flink环境,博主自己封装的接口FlinkEnvFlinkEnvflinkEnv=FlinkEnvUtils.getStreamTableEnv(args);//设置并发flinkEnv.env().setParallelism(100);flinkEnv.env()//数据来源:上报日志。addSource(xxx)//过滤掉购物车页面(Shopping-Cart)的数据.filter(newFilterFunction(){@Overridepublicbooleanfilter(SourceModelvalue)throwsException{returnvalue.getPage().equals("Shopping-Cart");}})//分配水印.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)){@OverridepubliclongextractTimestamp(SourceModelelement){返回元素.getTime();}})//Shuffleintooneoperator进行合并计算,所以这里返回的结果固定为0.keyBy(newKeySelector(){@OverridepublicLonggetKey(SourceModelvalue)throwsException{return0L;}})//开启一分钟的滚动时间window.window(TumblingEventTimeWindows.of(Time.minutes(1)))//计算uv.process(newProcessWindowFunction的处理逻辑(){@Overridepublicvoidprocess(LongaLong,Contextcontext,Iterableelements,Collectorout)throwsException{longwindowStart=context.window().getStart();Sets=newHashSet<>();elements.forEach(newConsumer(){@Overridepublicvoidaccept(SourceModelsourceModel){s.add(sourceModel.userId);}});out.collect(SinkModel.builder().uv(s.size()).time(windowStart).build());}})//输出.addSink(xxx);}//输入数据Model@Data@BuilderprivatestaticclassSourceModel{privatelonguserId;私有字符串页面;私密时间长;}//输出数据模型@Data@BuilderprivatestaticclassSinkModel{privatelonguv;私密时间长;}}2.2。问题场景当我们将这个任务部署到集群环境运行时,发现一直没有数据输出,但是输入数据(用户心跳日志)一直收到大量数据。表现如下:?Sourceoperator:可以一直消费数据,从webui的输入输出流量来看数据量非常大?Filteroperator:Filteroperator既有输入也有输出,输入量很大很大,但是输出的数据很少(这是业务原因,从购物业务的角度来说,只有极小部分人会停留在购物车页面)?滚动窗口算子:输入数据很少,但是一直没有输出,从webuiCheckingtheWatermarkofoperator也不见了。从这里开始,问题就清楚了。至少从Flinkwebui的角度来看,window数据不会触发计算,因为windowoperator没有Watermark。这时候的第一个猜想是:windowoperator上的Watermark没有对齐!!!接下来我们来看一下这个猜想的整体验证过程:?由于我们的WatermarkAssigner是写在Filter算子之后的,所以Watermark的生成也是基于Filter算子之后的数据。因此,如果我们要定位是否是上述猜想造成的,就需要估计Filter算子输出的数据量来进行验证。?经过验证,发现对于Filter算子之后产生的数据,每分钟向下游算子输出的数据总量不到60条。也就是说,在我们的100个并发任务上,只有60个Filter算子并发每分钟都会向下游的rollingwindowoperator输出数据,其余40个concurrentoperator不会向下游的Rollingwindowoperator发送任何数据。?最后,对于下游的rollingwindowoperator,是没有办法实现Watermark对齐的!因此,无法触发窗口。问题的原因找到了。4.问题原理分析——什么机制导致了问题要理解Watermark对齐是怎么回事,首先需要看一下Flink中Watermark的传输和计算机制:Watermark传输方式:广播。这里的广播是指一个上游算子的一个并发会广播到所有可以连接到的下游算子的并发,这与并发的上下游算子之间的Shuffle机制有关。这里的广播不是指Flink提供的BroadCast编程API!!!例如:一个任务是100个并发,上下游算子之间的Shuffle策略是Forward,那么上游算子的一个并发Watermark只会连接到下游算子如果策略是Hash\Rebalance,一个并发Watermark上游运营商的Watermark将发送给所有并发的下游运营商。Watermark计算方式:下游算子并发接收到上游算子的并发Watermark后,下游算子当前的并发Watermark计算方式(这里上下游是指连接的通道),计算公式:下游算子并发Watermark=min(Watermarksent上游算子并发1,上游算子并发发送的Watermark2...)是下游算子的并发Watermark=所有上游算子并发发送给下游算子的Watermark的最小值。Watermark对齐:当下游算子的并发Watermark依赖于上游算子的并发Watermark时,存在较大差异,即Watermark没有对齐。例如:上面一个运营商的一个并发传输的Watermark是23:59,另一个并发传输的Watermark是23:00,中间查了59分钟。这种情况一般是不正常的情况,所以称为错位。反之,如果Watermark差异很小,则称为Watermark对齐。再来一张图看看Watermark的传输过程加深理解:Watermark传播回到上面的案例,一分钟只有60个上游算子并发数据,将Watermark发送给下游window算子,剩下40个没有头发。所以下游的windowoperator没有Watermark,所以不会触发window。5.避坑篇——如何避免这类问题在上面的场景中,问题的根本原因是数据经过条件过滤后(购物车页面),数据量变得很小。WatermarkAssigner从非常少量的数据中生成非常少量的水印。有40个并发数据没有生成水印,下游算子水印错位。那么解决方法也很简单,就是多生成watermark,保证filter之后的数据虽然很小,但是filter算子处理完之后,每个并发上都有足够的watermark传递给下游的windowoperator进行连续触发窗口计算和结果输出。具体解决方案:重写Source算子之后Filter算子之前的WatermarkAssigner。代码如下:publicclassWatermarkTest{publicstaticvoidmain(String[]args)throwsException{//获取Flink环境,博主自己封装的接口FlinkEnvFlinkEnvflinkEnv=FlinkEnvUtils.getStreamTableEnv(args);flinkEnv.env()。设置平行度(100);flinkEnv.env()//Datasource.addSource(xxx)//AssignWatermark,beforeFilter.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)){@OverridepubliclongextractTimestamp(SourceModelelement){返回元素.getTime();}})//过滤掉购物车页面(Shopping-Cart)的数据.filter(newFilterFunction(){@Overridepublicbooleanfilter(SourceModelvalue)throwsException{returnvalue.getPage().equals("购物ng-Cart");}})//为了混入算子进行组合计算,返回的结果key固定为0keyBy(newKeySelector(){@OverridepublicLonggetKey(SourceModelvalue)throwsException{return0L;}})//开启一分钟滚动时间window.window(TumblingEventTimeWindows.of(Time.minutes(1)))//计算uv.process(newProcessWindowFunction(){@Overridepublicvoidprocess(LongaLong,Contextcontext,Iterableelements,Collectorout)抛出异常{longwindowStart=context.window().getStart();设置s=newHashSet<>();elements.forEach(newConsumer(){@Overridepublicvoidaccept(SourceModelsourceModel){s.add(sourceModel.userId);}});out.collect(SinkModel.builder().uv(s.size()).time(windowStart).build());}})//输出.addSink(xxx);}//输入数据Model@Data@BuilderprivatestaticclassSourceModel{privatelonguserId;私有字符串页面;私密时间长;}//输出数据Model@Data@BuilderprivatestaticclassSinkModel{privatelonguv;私人的很久;}}方案原理:在上面的业务场景中,Source数据非常多,我们可以利用大量的Source数据,使得WatermarkAssigner即使通过也可以不断的生成Watermark并传递给下游Filteroperator之后,发送给下游windowoperator的数据量很少,但是Watermark不会被Filteroperator过滤掉,大量的Watermark仍然可以正常的传给windowoperator,这样Watermark对齐,从而保证windowoperator输出的连续触发和结果。虽然方案很好,但是极小概率会出现乱序丢失的问题:比如Watermark是在Source算子之后产生的。23:50:50的购物车页面日志数据有可能是23:52如果网站主页面的日志数据是在0:00之后到达,Watermark已经提升到23:51:00秒,23:50的窗口已经被触发,所以23:50:50的购物车页面数据被窗口操作员丢弃。6.总结本文主要记录小伙伴在使用DataStreamAPI时,由于WatermarkAssigner设置太晚导致watermark无法对齐,导致事件时间窗口没有触发的问题。博主建议的编程习惯:在使用DataStreamAPI实现Flink任务时,WatermarkAssigner尽量靠近Source节点,尽量靠近前端。