实战问题(乱序)在介绍Watermark的相关内容之前,我们先提出一个具体的问题,在实际的流式计算中,数据到达的顺序会影响到的正确性对计算结果有至关重要的影响,例如:某个数据源中的某些数据会因为某些原因(如:网络原因、外部存储原因)有5秒的延迟,即最先生成secondoftheactualtime数据可能在数据产生后5秒到达(比如到Window处理节点)。对于一个具体的延迟元素,假设在一个5秒的Tumble窗口中(详见WindowIntroduction章节),有一个11秒的EventTime,它在第16秒到达。图中第11秒的数据到了16秒,如下图:那么对于Count聚合的一个Tumble(5s)window,如何处理上面的情况使得window2=4,window3=2?ApacheFlink的时间类型我们开头描述的问题是TimeWindow中非常普遍的数据乱序问题。乱序是相对于事件产生的时间以及事件到达ApacheFlink实际处理算子的顺序。ApacheFlink中的时间分为三种:时间类型,如下图所示:(1)ProcessingTimeProcessingTime是数据流入特定算子时对应的系统时间。ProcessingTime具有最好的性能和最好的延迟。但是在分布式计算环境中,ProcessingTime是不确定的,同一个数据流多次运行可能会产生不同的计算结果。(2)IngestionTimeIngestionTime为数据进入ApacheFlink框架的时间,在SourceOperator中设置。与ProcessingTime相比,它可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源头只记录一次),相同的数据在流过不同的窗口操作时会使用相同的时间戳,而对于ProcessingTime,相同的数据在Operators中流过通过不同的窗口会有不同的处理时间戳。(3)EventTimeEventTime是在设备产生事件时携带的。EventTime通常在进入ApacheFlink框架之前嵌入到记录中,也可以从记录中提取EventTime。在网购订单等实际业务场景中,EventTime多用于数据计算。开头描述的问题和本文要介绍的Watermark涉及的时间类型都是指EventTime类型。什么是WatermarkWatermark是ApacheFlink提出的一种处理EventTime窗口计算的机制。本质上也是一个时间戳。它是由ApacheFlinkSource或自定义Watermark生成器根据Punctuated或Periodic的要求生成的系统Event。,像正常的数据流事件一样传递给对应的下游算子,接收到WatermarkEvent的算子不断调整自己管理的EventTime时钟。ApacheFlink框架保证Watermark单调递增。当操作员收到Watermark时,框架知道不会再有时间戳小于Watermark的数据元素。因此,Watermark可以看作是告诉ApacheFlink框架,数据流已经处理完毕。什么位置(时间维度)方式。Watermark的生成以及ApacheFlink的内部处理逻辑如下图所示:Watermark的生成方式目前,ApacheFlink有两种Watermark的生成方式,如下:Punctuated-数据流中的每个增量EventTime都会生成一个Watermark。在实际生产中,Punctuated方式在TPS较高的场景下会产生大量Watermark,一定程度上会对下游算子造成压力。因此,只有在实时性要求非常高的场景下,才会使用Punctuated方式来生成Watermarks。.定期-定期生成水印(以特定时间间隔或达到特定数量的记录时)。在实际生产中,Periodic方法必须结合时间和累积次数两个维度,持续周期性生成Watermarks,否则在极端情况下会出现较大的延迟。因此,需要根据不同的业务场景来选择Watermark的生成方式。Watermark的接口定义对应ApacheFlinkWatermark两种不同的生成方式。Let'sunderstandthecorrespondinginterfacedefinitionasfollows:PeriodicWatermarks-AssignerWithPeriodicWatermarks/***Returnsthecurrentwatermark.Thismethoddisperiodicallycalledbythe*systemtoretrievethecurrentwatermark.Themethodmayreturn{@codenull}to*indicatemarkthatnonewWater**
Thereturnedwatermarkwillbeemittedonlyifitisnon-nullanditsTimestamp*islargerthanthatofthepreviouslyemittedwatermark(topreservethecontractof*ascendingwatermarks).Ifthecurrentwatermarkisstill*identicaltothepreviousone,自从*上次调用此方法以来,事件时间没有任何进展。如果返回空值,或者返回的水印的时间戳*小于上次调用的水印的时间戳,则不会生成*新水印。**
调用此方法和生成水印的间隔*取决于{@linkExecutionConfig#getAutoWatermarkInterval()}。*.@flink.watermarking.*.@flink.watermark.*.@fseeorg.watermarking.*.@fseeorg.watermarking.*.参见ExecutionConfig#getAutoWatermarkInterval()**@返回{@codeNull},ifnowatermarkshouldbeemitted,orthenextwatermarktoemit.*/@NullableWatermarkgetCurrentWatermark();PunctuatedWatermarks-AssignerWithPunctuatedWatermarkspublicinterfaceAssignerWithPunctuatedWatermarks Thereturnedwatermarkwillbeemittedonlyifitisnon-nullanditsTimestamp*islargerthanthatofthepreviouslyemittedwatermark(topreservethecontractof*ascendingwatermarks).Ifanullvalueisreturned,ortheTimestampofthereturned*watermarkissmallerthanthatofthelastemittedone,thennonewwatermarkwill*begenerated.** Foranexamplehowtousethismethod,seethedocumentationof*{@linkAssignerWithPunctuatedWatermarksthisclass}.**@return{@codeNull},ifnowatermarkshouldbeemitted,orthenextwatermarktoemit.*/@NullableWatermarkcheckAndGetNextWatermark(TlastElement,longextractedTimestamp);}AssignerWithPunctuatedWatermarks继承了TimestampAssigner接口-TimestampAssignerpublicinterfaceTimestampAssigner ThemethodispassedthepreviouslyassignedTimestampoftheelement.*ThatpreviousTimestampmayhavebeenassignedfromapreviousassigner,*byingestionTime.IftheelementdidnotcarryaTimestampbefore,thisvalueis*{@codeLong.MIN_VALUE}.**@paramelementTheelementthattheTimestampiswillbeassignedto.*@parampreviousElementTimestampThepreviousinternalTimestampoftheelement,*ornegativevalue,ifnoTimestampasbeenaassigned,yet.*@returnThenewTimestamp.*/longextractTimestamp(Telement,longpreviousElementTimestamp)levent;计算逻辑生成Watermark的时间戳Watermark解决了以上问题从上面的Watermark生成接口和ApacheFlink内部对PeriodicWatermark的实现来看,Watermark的时间戳可以和Event中的EventTime保持一致,也可以自己定义任何合理的逻辑让Watermark的时间戳不相等toEventEvent中的EventTime,Event中的EventTime从产生的那一刻起就不能改变,不受ApacheFlink框架的控制,Watermark的产生是在ApacheFlink的源节点或实现的节点上计算的Watermarkgenerator(如上文ApacheFlink内置的PeriodicWatermark实现),ApacheFlink针对单流或多流场景进行了统一的Watermark处理。回头看看Watermark机制是如何解决上述问题的。上面的问题就是如何正确处理迟到的EventTimebit11元素。要解决这个问题,我们需要了解EventTime窗口是如何触发的?EventTime窗口的计算条件是Window计算出的Timer时间戳小于等于当前系统的Watermak时间戳。当Watermark的时间戳等于Event中携带的EventTime时,上述场景(Watermark=EventTime)的计算结果如下:上面对应的DDL(ApacheFlink的阿里巴巴增强分支)定义如下:CREATETABLEsource(...,Event_timeTimeStamp,WATERMARKwk1FOREvent_timeaswithOffset(Event_time,0))with(...);如果想正确处理latedata,可以定义Watermark生成策略为Watermark=EventTime-5s,如下:上面对应的DDL(ApacheFlink的阿里巴巴增强分支)定义如下:CREATETABLEsource(...,Event_timeTimeStamp,WATERMARKwk1FOREvent_timeaswithOffset(Event_time,5000))with(...);上述正确处理的根本原因是我们采用了延迟触发窗口计算的方法来正确处理LateEvent。同时,我们发现窗口的延迟触发计算也会导致下游的LATENCY变大。在这个例子中,下游得到的窗口的结果延迟了5s。Multi-streamWatermarkprocessing在实际的流计算中,往往在一个job中处理多个源的数据,将源数据通过GroupBy进行分组,然后将不同源的相同key值shuffle到同一个处理节点,并且自带Watermark,ApacheFlink必须保证Watermark保持单调递增。当多个Source的Watermarks放在一起时,它们可能不会单调增加。ApacheFlink内部是如何处理这种情况的?如下图所示:ApacheFlinkInternal每一侧只能有一个增量Watermark。当多个流将Eventtime聚集在一起(GroupBy或Union)时,ApacheFlink将选择所有传入的Eventtime中最小的一个流向下游。这样既保证了watermark的单调递增,又保证了数据的完整性。如下图所示:总结本节通过一个流计算中常见的乱序问题来介绍ApacheFlink是如何使用Watermark机制来处理乱序问题的。这篇文章的内容也在一定程度上反映了EventTimeWindow中的Trigger机制依赖于Watermark(后续Window章节会介绍)。Watermark机制是流计算中处理乱序和正确处理LateEvents的核心手段。#关于点赞和评论本系列文章难免有很多瑕疵和不足。真诚希望读者对有收获的章节给予表扬和鼓励,对不足的章节给予反馈和建议。先感谢您!作者孙金城,昵称金珠,目前就职于阿里巴巴,从2015年开始投入阿里巴巴基于ApacheFlink的计算平台Blink的设计和开发。【本文为专栏作家“金珠”原创稿件,转载请联系原作者】点此阅读该作者更多好文