1。序言进入正文。下面是文章目录,也对应了本文的结论。小伙伴们可以先看结论,快速了解这篇文章能给你带来什么帮助:背景及应用场景介绍:Join是离线数仓中最常见的场景。仓库里也不可能缺少它。flinksql提供的丰富的join方式(总结6种:regularjoin、维表join、temporaljoin、intervaljoin、arrayflattening、tablefunction函数)为我们提供了强大的方式来满足我们的需求。下面从一个实际案例开始:以一个曝光日志leftjoinclicklog为案例介绍flinksqljoin的解决方案flinksqljoin的解决方案及存在问题的介绍:主要介绍上面regularjoin的运行结果cases并分析源码机制,虽然简单,但是leftjoin,rightjoin,fulljoin都会有retract的问题,所以在使用之前,应该充分了解它的运行机制,避免重数据传输的问题和过多的数据传输。本文主要介绍regularjoinretract的问题,下一节介绍如何使用intervaljoin来避免这个retract问题,满足第2点的实战案例需求。2.背景及应用场景介绍在我们的日常场景中,一个最广泛使用的操作必须有连接的地方。比如计算曝光数据和点击数据的CTR,需要通过唯一idjoin,关联事实数据和关联维度数据。维度,然后计算维度指标以上场景在离线数仓中应用比较广泛。那么,如何操作实时流之间的关联呢?FlinkSQL为我们提供了四种强大的关联方法,帮助我们在流式场景下实现流式关联的目的。如下官网截图所示:joinregularjoin:leftjoin,rightjoin,fulljoin,innerjoin维表查找连接:维表关联temporaljoin:快照表连接intervaljoin:一段时间内的两个流join数组爆炸:columntransfertablefunctionjoin:join是在实时数仓中通过tablefunction自定义函数实现的(类似columntransfer的效果,或者类似维表join的效果),regularjoin和intervaljoin,and两种join的组合是最常用的。所以本文主要介绍这两种(你可能不想看太长的文章,所以后面的文章会以简明扼要为目标)。3、我们先举个实际案例,看看在特定的输入值场景下,输出值应该是什么样子。场景:普通曝光日志流(show_log)通过log_id关联点击日志流(click_log),下发数据关联结果。下面是一波输入数据:曝光数据:log_idtimestampshow_params12021-11-0100:01:03show_params22021-11-0100:03:00show_params232021-11-0100:05:00show_params3点击数据:log_idtimestampclick_params12021-11-0100:01:53click_params22021-11-0100:02:01click_params2预期输出数据如下:log_idtimestampshow_paramsclick_params12021-11-0100:01_clickparamshow_params22021-11-0100:01:00show_params2click_params232021-11-0100:02:00show_params3null熟悉离线hivesql的同学可能10s就写完上面这个sql了,如下hivesqlINSERTINTOsink_tableSELECTshow_log.log_idaslog_id,show_log.timestampastimestamp,show_log.show_paramsasshow_params,click_log.click_paramsasclick_paramsFROMshow_logLEFTJOINclick_logONshow_log.log_id=click_log.log_id;那我们看看如何用flinksql来实现上面的需求?虽然flinksql没有提供leftjoin能力,但是在实际使用中可能会出现意想不到的问题,下一节会详细介绍。4.flinksqljoin4.1.flinksql还是上面的情况,先实际运行一下看看结果:flinkwebui结果如下:+[1|2021-11-0100:01:03|show_params|null]-[1|2021-11-0100:01:03|show_params|null]+[1|2021-11-0100:01:03|show_params|click_params]+[2|2021-11-0100:03:00|show_params|click_params]+[3|2021-11-0100:05:00|show_params|null]从结果看,它的输出数据有+,-,说明输出数据是retractstream数据。分析的原因是,由于第一个show_log先于click_log到达,+[1|2021-11-0100:01:03|显示参数|null]先下发,click_log到达后,将之前未关联的show_log撤回,再关联的+[1|2021-11-0100:01:03|显示参数|click_params]将被传送。但是retractingstream会导致更多的数据写入到Kafka,这是不可接受的。我们期望的结果应该是附加流。为什么leftjoin会出现这种问题呢?先从leftjoin的原理说起。定位到具体的实现源码。让我们先看看转换。Transformations可以看到leftjoin的具体算子是org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator。它的核心逻辑集中在processElement方法上。并且源码中对processElement的处理逻辑有详细的注释,如下图所示。StreamingJoinOperator#processElement注释似乎逻辑复杂。下面我们就按照leftjoin、innerjoin、rightjoin和fulljoin的分类来给大家讲解一下。4.2.leftjoin首先是leftjoin,以上面的show_log(左表)leftjoinclick_log(右表)为例:首先,如果joinxxxon中的条件相等,说明在同一个key下进行join,joinkey是show_log.log_id,click_log.log_id,相同key的数据会被送到一个并发进程处理。如果joinxxxon中条件不等,则两个流的源算子按照全局分区策略向join算子发送数据,join算子并发度会被设置为1,所有数据都会发送到这个并发加工。同一个key下,当一条数据来自show_log时,如果click_log中有数据:则show_log遍历click_log中的所有数据并关联一次输出[+(show_log,click_log)]数据,将show_log保存到左表的状态(以用于后续连接的形式)。同一个key下,当show_log来一条数据时,如果click_log:中没有数据,那么show_log不会等待,直接输出[+(show_log,null)]数据,将show_log保存到左表的state中(供后续加入使用)。同一个key下,当click_log来了一条数据,如果show_log有数据:那么click_log遍历并关联一次show_log中的所有数据。输出数据前会判断,如果关联的show_log之前没有和click_log关联过(即下发过[+(show_log,null)]),则发送一个[-(show_log,null)]first,Senda[+(show_log,click_log)]later,意思是撤回之前show_log与click_log数据不相关的中间结果,发送当前相关的最新结果,click_log保存到状态右表(用于后续的左表关联)。这也解释了为什么输出流是retract流。同一个key下,当click_log中有一条数据时,如果show_log中没有数据:将click_log保存到右表的状态(为后续关联左表)。4.3.Innerjoin以上面的show_log(左表)innerjoinclick_log(右表)为例:首先,如果joinxxxon中的条件相等,则表示在同一个key下进行join,key加入的是show_log。log_id,click_log.log_id,key相同的数据会被送到并发进程处理。如果joinxxxon中条件不等,则两个流的源算子按照全局分区策略向join算子发送数据,join算子并发度会被设置为1,所有数据都会发送到这个并发加工。同一个key下,当一条数据来自show_log时,如果click_log中有数据:则show_log遍历click_log中的所有数据并关联一次输出[+(show_log,click_log)]数据,将show_log保存到左表的状态(以用于后续连接的形式)。同一个key下,当show_log有一条数据过来时,如果click_log:中没有数据,那么show_log不会输出数据,会把show_log保存到左表的state中(供后续join使用)。同一个key下,当click_log自带一条数据时,如果show_log有数据:则click_log遍历show_log中的所有数据并关联一次输出[+(show_log,click_log)]数据,click_log保存为state右表(以用于后续连接的形式)。同一个key下,当click_log有一条数据时,如果show_log:中没有数据,那么click_log不会输出数据,会将click_log保存到右表的status中(供后续join使用)。4.4.rightjoinrightjoin和leftjoin一样,只是顺序相反,这里不再赘述。4.5.fulljoin以上面的show_log(左表)fulljoinclick_log(右表)为例:首先,如果joinxxxon中的条件相等,则表示在同一个key下进行join,key加入的是show_log。log_id,click_log.log_id,key相同的数据会被送到并发进程处理。如果joinxxxon中条件不等,则两个流的源算子按照全局分区策略向join算子发送数据,join算子并发度会被设置为1,所有数据都会发送到这个并发加工。同一个key下,当一条数据来自show_log时,如果click_log有数据:则show_log遍历并关联一次click_log中的所有数据。输出数据前会判断,如果关联的click_log之前没有关联过show_log(即下发过[+(null,click_log)]),则发送一个[-(null,click_log)]第一的。later发送一个[+(show_log,click_log)],表示撤回之前与show_log数据不相关的click_log中间结果,发送当前相关的最新结果,将show_log保存到左表状态(为了后面的join)在同一个key下,show_log来了一条数据,如果click_log中没有数据:那么show_log就不会等待,直接输出[+(show_log,null)]数据,把show_log保存到lefttableIn状态(用于后续连接)。同一个key下,当click_log来了一条数据,如果show_log有数据:那么click_log遍历并关联一次show_log中的所有数据。输出数据前会判断,如果关联的show_log之前没有和click_log关联过(即下发过[+(show_log,null)]),则发送一个[-(show_log,null)]first,Senda[+(show_log,click_log)]later,意思是撤回之前show_log与click_log数据不相关的中间结果,发送当前相关的最新结果,click_log保存到状态右表中(供后续join使用)同一个key下,当click_log来一条数据,如果show_log:中没有数据,则click_log不等待,直接输出[+(null,click_log)]数据,并保存click_log到右表In状态(用于后续连接)。4.6.regularjoin总结总的来说,以上四种join可以分为以下几种。innerjoin会互相等待,直到有数据再发送。左连接、右连接、全连接不会互相等待。只要有数据来,它就会尝试关联。如果可以关联,则传递的字段是完整的。如果无法建立关联,则另一端的字段将为空。后续数据到达后,如果发现之前已经发送过的数据没有与之关联,则撤回并发送关联结果。4.7.如何解决retract导致数据重复发送到kafka的问题?由于flinksql在leftjoin、rightjoin、fulljoin实现中的原理都是这种retract方式实现的,所以这种方式满足不了业务。让我们换个思路。上面join的特点是不会互相等待。有什么可以互相等待的join吗?以leftjoin的思想为例。当左表无法关联到右表时,可以选择等待一段时间。如果不能等超过这个时间,可以发送(show_log,null),如果等不及,可以发送(show_log,click_log)。间隔连接就在这里。关于intervaljoin如何实现上述场景及其原理实现,本文(下)将详细介绍,敬请期待。五、总结与展望源码公众号后台回复1.13.2sqljoin解析的妙招。本文主要介绍flinksqlregular在满足join场景的问题,并通过分析其实现来说明运行原理。主要包括以下两部分:背景及应用场景介绍:join是离线数仓中最常见的场景。实时数仓中也不可能缺少它。flinksql提供的丰富的join方式(总结了4种:regularjoin、维表join、temporaljoin、intervaljoin)为我们满足需求提供了强大的后盾。先来一个实战案例:以一个曝光日志leftjoinclicklog为例,介绍flinksqljoin的解决方案。flinksqljoin的解决方案及存在问题的介绍:主要介绍上面几个案例中regularjoin的运行结果,分析源码机制。虽然简单,但是leftjoin、rightjoin、fulljoin都会有retract的问题,所以在使用之前,应该充分了解它的运行机制,避免数据传输量大、传输量过大的问题。本文主要介绍regularjoinretract的问题,下一节介绍如何使用intervaljoin来避免这个retract问题,满足第2点的实战案例需求。
