1.说什么JOIN算子是数据处理的核心算子。前面我们在《Apache Flink 漫谈系列(09) - JOIN 算子》介绍了UnBounded双流JOIN,在介绍了单流和UDTF的JOIN操作,在《Apache Flink 漫谈系列(11) - Temporal Table JOIN》介绍了单流和版本表的JOIN操作。本文将介绍在UnBounded数据流上按时间维度划分数据的JOIN操作——TimeInterval(Time-windowed)JOIN,后面我们称之为IntervalJOIN。2.实际问题在前面的章节中,我们介绍了Flink中对各种JOIN的支持,那么想一想之前介绍的JOIN是否可以满足下面的查询需求呢?需求描述如下:比如有一个订单表Orders(orderId,productName,orderTime)和支付表Payment(orderId,payType,payTime)。假设我们要统计一个小时内支付的订单信息。1、传统数据库解决方案在传统刘数据库中完成上述需求非常简单。查询sql如下:SELECTo.orderId,o.productName,p.payType,o.orderTime,payTimeFROMOrdersASoJOINPaymentASpONo.orderId=p.orderIdANDp.??payTime>=orderTimeANDp.??payTime=orderTimeANDp.??payTimeastimestamp中找到)。ApacheFlink的IntervalJOIN之后,可以进行Event-TimeWindowAggregate。3、IntervalJOIN为了满足上述需求,解决性能和功能扩展的问题,ApacheFlink在1.4开始开发Time-windowedJoin,也就是本文所说的IntervalJOIN。下面详细介绍IntervalJOIN的语法、语义和实现原理。3、什么是IntervalJOINIntervalJOIN是BoundedJOIN,相对于UnBounded双流JOIN。也就是说,每个流中的每条数据都会与另一个流上不同时区的数据进行JOIN。对应ApacheFlink官方文档的Time-windowedJOIN(release-1.7之前叫Time-windowedJOIN)。1.区间JOIN语法SELECT...FROMt1JOINt2ONt1.key=t2.keyANDTIMEBOUND_EXPRESSIONTIMEBOUND_EXPRESSION有两种写法,如下:L.timebetweenLowerBound(R.time)andUpperBound(R.time)R.timebetweenLowerBound(L.time)和UpperBound(L.time)具有时间属性(L.time/R.time)的比较表达式。2.IntervalJOIN语义IntervalJOIN的语义是每条数据对应一个Interval数据范围,比如有一张订单表Orders(orderId,productName,orderTime)和一张支付表Payment(orderId,payType,payTime)).假设我们要统计下单后一小时内付款的订单信息。SQL查询如下:SELECTo.orderId,o.productName,p.payType,o.orderTime,cast(payTimeastimestamp)aspayTimeFROMOrdersASoJOINPaymentASpONo.orderId=p.orderIdANDp.??payTimeBETWEENOrderTimeANDorderTime+INTERVAL'1'HOUR订单数据支付数据符合语义预期结果结果表中没有出现订单id为003的信息,因为订单时间为2018-12-2604:53:24.0,付款时间为2018-12-2605:53:30.0,这超过1小时付款。那么预期结果信息如下:这样Id为003的订单为无效订单,可以更新库存继续销售。接下来,我们将以图表的形式直观地说明IntervalJOIN的语义。我们对上面例子的要求稍微改动一下:下单可以提前付款(不管合理与否,只是为了说明语义),即下单前后1小时付款才有效。SQL语句如下:SELECT...FROMOrdersASoJOINPaymentASpONo.orderId=p.orderIdANDp.??payTimeBETWEENOrderTime-INTERVAL'1'HOURANDorderTime+INTERVAL'1'HOUR这样的查询语义图如下:上图中有几个关键点,如下:数据JOIN的区间——比如Ordertime为3的订单,会在支付时间区间[2,4]进行JOIN。WaterMark-例如图中,Order的最后一个数据时间是3,Payment的最后一个数据时间是5,那么WaterMark是根据实际最小值减去UpperBound生成的,即:Min(3,5)-1=2expiredData——出于性能和存储的考虑,应该清除过期数据,如图,当WaterMark为2时,时间2之前的数据过期,可以清除。3.IntervalJOIN的实现原理由于IntervalJOIN和双流JOIN类似左右两边存储数据,所以底层实现还是使用State来进行数据存储。流计算的特点是数据源源不断地流入,我们可以不断地进行增量计算,也就是可以对每一个流入的数据进行JOIN计算。我们还是用具体的例子和图来说明内部的计算逻辑,如下图:对每条记录的处理逻辑简单说明如下:实际内部逻辑比描述的复杂很多,你可以根据以上简要说明了解内部原理。能。4.示例代码我们依然以下单和支付为例,将完整代码分享给大家,如下(代码基于flink-1.7.0):importjava.sql.Timestampimportorg.apache.flink.api.scala._importorg。apache.flink.streaming.api.TimeCharacteristicimportorg.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimportorg.apache.flink.streaming.api.windowing.time.Timeimportorg。apache.f.table.api.TableEnvironmentimportorg.apache.flink.table.api.scala._importorg.apache.flink.types.Rowimportscala.collection.mutableobjectSimpleTimeIntervalJoin{defmain(args:Array[String]):Unit={valenv=StreamExecutionEnvironment.getExecutionEnvironmentvaltEnv=TableEnvironment.getTableEnvironment(env)env.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//构造订单数据valordersData=newmutable.MutableList[(String,String,Timestamp)]ordersData.+=(("001","iphone",newTimestamp(1545800002000L)))ordersData.+=(("002","mac",newTimestamp(1545800003000L)))ordersData.+=(("003","book",newTimestamp(1545800004000L)))ordersData.+=(("004","cup",newTimestamp(1545800018000L)))//构造付款表valpaymentData=newmutable.MutableList[(String,String,Timestamp)]paymentData.+=(("001","alipay",newTimestamp(1545803501000L)))paymentData.+=(("002","card",newTimestamp(1545803602000L)))paymentData.+=(("003","card",newTimestamp(1545803610000L)))paymentData.+=(("004","alipay",newTimestamp(1545803611000L)))valorders=env.fromCollection(ordersData).assignTimestampsAndWatermarks(newTimestampExtractor[String,String]()).toTable(tEnv,'orderId,'productName,'orderTime.rowtime)valratesHistory=env.fromCollection(paymentData)).assignTimestampsAndWatermarks(newTimestampExtractor[String,String]()).toTable(tEnv,'orderId,'payType,'payTime.rowtime)tEnv.registerTable("O订单",订单)tEnv.registerTable("付款",ratesHistory)varsqlQuery="""|SELECT|o.orderId,|o.productName,|p.payType,|o.orderTime,|cast(payTimeastimestamp)aspayTime|FROM|OrdersASoJOINPaymentASpONo.orderId=p.orderIdAND|p.payTimeBETWEENOrderTimeANDorderTime+INTERVAL'1'HOUR|""".stripMargintEnv.registerTable("TemporalJoinResult",tEnv.sqlQuery(sqlQuery))valresult=tEnv.scan("TemporalJoinResult").toAppendStream[Row]result.print()env.execute()}}classTimestampExtractor[T1,T2]extendsBoundedOutOfOrdernessTimestampExtractor[(T1,T2,Timestamp)](Time.seconds(10)){overridedefextractTimestamp(元素:(T1,T2,时间戳)):Long={element._3.getTime}}运行结果如下:第5节本文从实际业务需求场景出发,介绍使用无界双流JOIN实现相同业务需求或者TimeIntervalJOIN,TimeIntervalJOIN的性能优于UnBounded的双流JOIN,并且IntervalJOIN之后,WindowAggregate操作或者可以进行计算。然后介绍了IntervalJOIN的语法、语义和实现原理。***完整的订单支付示例代码分享给大家。希望这篇文章能让大家对ApacheFlinkTimeIntervalJOIN有一个具体的了解!这一系列关于点赞和评论的文章难免有很多瑕疵和不足。提前感谢您的反馈和建议!作者:孙金城,昵称金珠,目前就职于阿里巴巴。2015年开始投入阿里巴巴基于ApacheFlink的计算平台Blink的设计和开发。【本文为专栏作家“金竹”原创稿件,转载请联系原作者】点此阅读更多该作者好文