www.ydisp.cn/oss/202207/13/36b9a7296e02af41e9e4033d1c403e01416444.png"alt="image"title="image"style="width:1080px;能见度:可见;height:568px;"data-type="inline">那么对于一个带有Count聚合的Tumble(5s)窗口,如何处理上面的情况使得window2=4和window3=2呢?就是我们开头描述的问题ApacheFlink的时间类型是TimeWindow中非常常见的数据乱序问题,是相对于事件产生的时间和到达ApacheFlink实际处理算子的顺序而言的。关于时间,有ApacheFlink中的三种时间,如下图所示:,ProcessingTime是不确定的,同一个数据流的多次运行可能会产生不同的计算结果。IngestionTimeIngestionTime是在数据入口SourceOperator中设置的ApacheFlink框架的时间。与ProcessingTi相比me,它可以提供更可预测的结果,因为IngestionTime的时间戳是比较稳定的(在源头只记录一次),同样的数据在流过不同的windowoperations时会使用相同的时间戳进行处理,但是对于ProcessingTime,同样的数据在流经不同窗口算子时会有不同的处理时间戳。EventTimeEventTime在设备上产生事件时携带。在进入ApacheFlink框架之前,EventTime通常嵌入在记录中,也可以从记录中提取EventTime。在网购订单等实际业务场景中,EventTime多用于数据计算。开头描述的问题和本文要介绍的Watermark涉及的时间类型都是指EventTime类型。什么是WatermarkWatermark是ApacheFlink提出的一种处理EventTime窗口计算的机制。本质上也是一个时间戳。它是由ApacheFlinkSource或自定义Watermark生成器根据Punctuated或Periodic的要求生成的系统Event。,像正常的数据流事件一样传递给对应的下游算子,接收到WatermarkEvent的算子不断调整自己管理的EventTime时钟。ApacheFlink框架保证Watermark单调递增。当操作员收到Watermark时,框架知道不会再有时间戳小于Watermark的数据元素。因此,Watermark可以看作是告诉ApacheFlink框架,数据流已经处理完毕。什么位置(时间维度)方式。Watermark的生成以及ApacheFlink的内部处理逻辑如下图所示:Watermark的生成方式目前ApacheFlink有两种Watermark的生成方式,如下:Punctuated数据流中的每个增量EventTime都会生成一个Watermark。在实际生产中,Punctuated方式在TPS较高的场景下会产生大量Watermark,一定程度上会对下游算子造成压力。因此,只有在实时性要求非常高的场景下,才会使用Punctuated方式来生成Watermarks。.Periodic周期性地生成一个Watermark(以一定的时间间隔或达到一定数量的记录时)。在实际生产中,Periodic方法必须结合时间和累积次数两个维度,持续周期性生成Watermarks,否则在极端情况下会出现较大的延迟。因此,需要根据不同的业务场景来选择Watermark的生成方式。Watermark的接口定义对应ApacheFlinkWatermark两种不同的生成方式,我们了解了下对应的接口定义,如下:PeriodicWatermarks-AssignerWithPeriodicWatermarks/***返回当前水印。*系统定期调用此方法以检索当前水印。该方法可能会返回{@codenull}以*指示没有新的Watermark可用。**
仅当返回的水印非空且其时间戳*大于先前发出的水印的时间戳时才会发出(以保留升序水印的合同)。如果当前水印仍然*与前一个相同,则自上次调用此方法以来EventTime没有发生任何进展。如果返回空值,或者返回的水印的时间戳*小于最后一个发出的水印的时间戳*则不会生成*新水印。**
调用此方法和生成Watermarks的时间间隔*取决于{@linkExecutionConfig#getAutoWatermarkInterval()}。**@seeorg.Apache.flink.streaming.api.watermark.Watermark*@seeExecutionConfig#getAutoWatermarkInterval()**@return{@codeNull},如果不应发出水印,或下一个要发出的水印。*/@NullableWatermarkgetCurrentWatermark();PunctuatedWatermarks-AssignerWithPunctuatedWatermarkspublicinterfaceAssignerWithPunctuatedWatermarks 只有当返回的水印为非空且其时间戳*大于先前发出的水印的时间戳时才会发出(以保留升序水印的合同)。如果返回一个空值,或者返回的*水印的时间戳小于最后一个发出的水印的时间戳,则不会生成新的水印*特德。** 有关如何使用此方法的示例,请参阅*{@linkAssignerWithPunctuatedWatermarks此类}的文档。**@return{@codeNull},如果不应该发出水印,或者下一个要发出的水印。*/@NullableWatermarkcheckAndGetNextWatermark(TlastElement,longextractedTimestamp);}AssignerWithPunctuatedWatermarks继承了TimestampAssigner接口-TimestampAssignerpublicinterfaceTimestampAssigner 该方法被传递给元素先前分配的时间戳。*之前的时间戳可能是从之前的分配者那里分配的,*按摄取时间。如果元素之前没有携带时间戳,则此值为*{@codeLong.MIN_VALUE}。**@paramelement将分配时间戳的元素。*@parampreviousElementTimestamp之前的内部时间戳元素的时间戳,*或负值,如果尚未分配时间戳。*@return新的时间戳。*/longextractTimestamp(Telement,longpreviousElementTimestamp);}从接口定义可以看出,Watermark可以在Event中使用ExtractEventTimefrom(Element),然后定义一定的计算逻辑生成WatermarktimestampWatermark解决了上面的问题问题从上面Watermark的生成接口和ApacheFlink内部实现PeriodicWatermark可以看出,Watermark的时间戳可以和Event中的EventTime保持一致,也可以自己定义任何合理的逻辑,让Watermark的时间戳不等于Event中的EventTime事件。Event中的EventTime从产生的那一刻起就不能改变。不受ApacheFlink框架控制,Watermark的生成是在ApacheFlink的源节点或者实现的Watermarkgenerator计算生成的(上面ApacheFlink内置的PeriodicWatermark实现),而ApacheFlink对单流或多流场景有统一的Watermark处理。回头看看Watermark机制是如何解决上述问题的。上面的问题就是如何正确处理迟到的EventTimebit11元素。要解决这个问题,我们需要了解EventTime窗口是如何触发的?EventTime窗口的计算条件是当Window计算出的Timer时间戳小于等于当前系统的Watermak时间戳时计算。当Watermark的时间戳等于Event中携带的EventTime时,上述场景(Watermark=EventTime)的计算结果如下:上面对应的DDL定义如下:createtablet1(tstimestamp(3),otherbigint,WATERMARKFORtsASts)with('connector'='xx')如果想正确处理latedata,可以定义Watermark生成策略为Watermark=EventTime-5s,如下:对应上面的DDL定义如下:createtablet1(tstimestamp(3),otherbigint,WATERMARKFORtsASts-interval'5'SECOND)with('connector'='xx')上述正确处理的根本原因在于我们采用延迟触发窗口计算的方法来正确处理LateEvent。同时,我们发现窗口延迟触发计算也导致下游LATENCY变大。在这个例子中,下游得到的窗口的结果延迟了5s。Multi-streamWatermarkprocessing在实际的流计算中,往往在一个job中处理多个stream。对源数据进行GroupBy分组,然后将不同来源的相同key值shuffle到同一个处理节点,并带有各自的水印。ApacheFlink必须保证watermarks保持单调递增,多个source的watermarks聚集在一起时,不一定是单调递增的。ApacheFlink内部是如何处理这种情况的?如下图所示:ApacheFlink内部实现了每一侧只能有一个增量Watermark。当多个流一起携带Eventtime(Join或Union)时,ApacheFlink会选择最小的min(stream1,stream2...streamN)流向下游。从而保证watermark的单调递增,保证数据的完整性。如下图所示:总结本节通过一个流计算中常见的乱序问题来介绍ApacheFlink是如何使用Watermark机制来处理乱序问题的。这篇文章的内容也在一定程度上反映了EventTimeWindow中的Trigger机制依赖于Watermark(后续的Window章节会介绍)。Watermark机制是流计算中处理乱序和正确处理LateEvents的核心手段。更多详情,敬请关注《Apache Flink 知其然,知其所以然》系列视频课程!作者介绍孙金城,社区编辑,ApacheFlinkPMC成员,ApacheBeamCommitter,ApacheIoTTDBPMC成员,ALCBeijing成员,Apache神鱼导师,Apache软件基金会成员。专注于技术领域的流计算和时序数据存储。