当前位置: 首页 > 科技观察

ApacheFlink漫谈系列(十)——JOINLATERAL

时间:2023-03-13 05:34:54 科技观察

一、聊什么在上一篇《Apache Flink 漫谈系列 - JOIN算子》我们对最常见的JOIN进行了详细的分析。本文介绍一种特殊的JOIN,即JOINLATERAL。为什么JOINLATERAL很特别?直观上,因为JOIN的右边并不是一个实际的物理表,而是一个VIEW或者Table-valuedFunciton。本文将首先介绍传统数据库对LATERALJOIN的支持,然后介绍目前ApacheFlink对LATERALJOIN的支持。2.实际问题假设我们有两张表,一张是Customers表(消费者id,城市),一张是Orders表(订单id,消费者id)。两张表的DDL(SQLServer)如下:insertintoCustomersvalues('C003','北京');insertintoCustomersvalues('C003','北京');insertintoCustomersvalues('C003','北京');C004','杭州');查看数据:OrdersCREATETABLEOrders(orderidchar(5)NOTNULL,customeridchar(5)NULL)insertintoOrdersvalues('O001','C001');insertintoOrdersvalues('O002','C001');insertintoOrdersvalues('O003','C003');insertintoOrdersvalues('O004','C001');查看数据:1.问题示例假设我们要查询所有Customer的客户ID、位置和订单信息,我们要的信息是:(1)用INNERJOIN解决如果你看了《Apache Flink 漫谈系列 - JOIN算子》,我想你就会想到INNERJOIN来解决这个查询需求。SQL如下:SELECTc.customerid,c.city,o.orderidFROMCustomerscJOINOrdersoONo.customerid=c.customerid查询结果如下:但是如果真的用上面的方法解决,就没有内容可以介绍了在这篇文章中,让我们改变一下写作方式。2、使用Correlated子查询来解决Correlated子查询就是在子查询中使用关联表的字段,子查询可以在FROM子句中,也可以在WHERE子句中。WHERE子句使用WHERE子句实现上述查询需求,SQL如下:SELECTc.customerid,c.cityFROMCustomerscWHEREc.customeridIN(SELECTo.customerid,o.orderidFROMOrdersoWHEREo.customerid=c.customerid)执行:上面的问题用在WHERE子查询的查询列必须对应要比较的列,否则无法投影o.orderid,为什么我在上面的查询中加了一个o.orderid,因为查询要求需要o.orderid,而o.orderid查询可以去掉成功了,但是结果不是我们想要的,如下:SELECTc.customerid,c.cityFROMCustomerscWHEREc.customeridIN(SELECto.customeridFROMOrdersoWHEREo.customerid=c.customerid)查询结果:可以看出上面查询结果缺少o.orderid,无法完成我们的查询。FROM子句使用WHERE子句实现上述查询需求,SQL如下:SELECTc.customerid,c.city,o.orderidFROMCustomersc,(SELECto.orderid,o.customeridFROMOrdersoWHEREo.customerid=c.customerid)所以我们会得到出现以下错误:错误信息提示我们无法识别c.customerid。在ANSI-SQL中,FROMClause中的子查询无法引用左表中的信息,所以单纯使用FROMClause中的子查询是无法解决上述问题的,那么除了INNERJOIN之外,如何解决上述查询需求呢?3.JOINLATERAL我们分析上面的需求,本质上是根据左表Customers的customerid查询右表的Orders信息,就像一个For循环,外层遍历了左表Customers的所有数据,而内层是根据左表Customers表中的每个Customerid到右表Orders中进行遍历查询,然后join符合条件的左右表数据。这种根据左表中的数据一张一张动态生成右表进行JOIN的语义,LATERAL键是SQL标准Word中提出的,也叫横向驱动表。1.CROSSAPPLY和LATERAL在上面的例子中,我们使用SQLServer进行测试。我再提一下SQLServer是如何支持LATERAL的?SQLServer使用自己的方言CROSSAPPLY来支持它。那么为什么要使用CROSSAPPLY而不是ANSI-SQL的LATERAL呢?可能的原因是当时引入了SQLServer来解决TVF问题。2005年的开发是2000年进行的,可能会有时间上的滞后。LATERAL出来的时候,CROSSAPPLY已经在SQLServer中开发出来了。因此,SQLServer出于各种原因采用了CROSSAPPLY,但CROSSAPPLY的语义与LATERAL完全相同。同时支持LATERAL的Oracle12和PostgreSQL94同时支持LATERAL和CROSSAPPLY。2.问题求解那么我们回到上面的问题。我们使用SQLServer的CROSSAPPLY来解决上面的问题。SQL如下:上面得到的结果完全满足查询要求。4、JOINLATERAL和INNERJOIN的关系上面的查询需求没有体现JOINLATERAL和INNERJOIN的区别。下面我们看一下SQLServer中的两个查询执行计划:上面我们发现SQLServer优化器优化后的两个查询两个执行计划是完全一样的,那为什么还要创建一个LATERAL呢?1、在性能上,我们对上面的查询需求稍做改动,我们查询所有Customers和Customers的第一条订单信息。LATERAL的写法是SELECTc.customerid,c.city,o.orderidFROMCustomerscCROSSAPPLY(SELECT***)o.orderid,o.customeridFROMOrdersoWHEREo.customerid=c.customeridORDERBYo.customerid,o.orderid)aso查询结果:我们发现C001的Customer虽然有3个订单,但是我们查询到的是最新的信息。JOIN写法SELECTc.customerid,c.city,o.orderidFROMCustomerscJOIN(SELECto2.*,ROW_NUMBER()OVER(PARTITIONBYcustomeridORDERBYorderid)ASrnFROMOrderso2)oONc.customerid=o.customeridANDo.rn=1查询结果:我们完成了上面的查询需求,我们我们来看一下计划的执行情况,如下:我们直观的发现完成同样的功能,使用CROSSAPPLY进行查询,计划的执行要简单很多。2.函数在函数上,INNERJOIN本身不允许在ANSI-SQL中有JOIN函数,这也是SQLServer当时引入CROSSAPPLY的根本原因。我们以SQLServer中的DMV(相当于TVF)查询为例:SELECTname,log_backup_timeFROMsys.databasesASsCROSSAPPLYsys.dm_db_log_stats(s.database_id);查询结果:5.ApacheFlink对LATERAL的支持我花了很多篇章以SQLServer为例介绍ANSI-SQL和传统数据库是如何支持LATERAL的。接下来我们看看ApacheFlink对LATERAL的支持。1.CalciteApacheFlink使用Calcite来分析和优化SQL。目前,Calcite完全支持LATERAL语法。示例如下:SELECTe.NAME,e.DEPTNO,d.NAMEFROMEMMPSe,LATERAL(SELECT*FORMDEPTSdWHEREe.DEPTNO=d.DEPTNO)asd;查询结果:我用的是Calcite官方的测试数据。2.从Flink-1.6.2开始,ApacheFlink中使用LATERAL的场景有两种,如下:UDTF(TVF)-User-definedTableFuncitonTemporalTable-相关内容将在后续章节单独介绍。在本文中,我们以TVF(UDTF)为例,说明如何在ApacheFink中支持LATERAL。(1)UDTFUDTF-User-definedTableFunction是ApacheFlink中三大用户自定义函数(UDF、UDTF、UDAGG)之一。自定义接口如下:Baseclass/***Baseclassforalluser-definedfunctionssuchasscalarfunctions,tablefunctions,*oraggregationfunctions.*/abstractclassUserDefinedFunctionextendsSerializable{//关键是FunctionContext提供了几个高级属性(会在UDX章节详细介绍)defopen(context:FunctionContext):Unit={}defclose():Unit={}}TableFunction/***用户定义表函数(UDTF)的基类。用户定义表函数以*零、一或多个标量值作为输入并返回多个行作为输出。**行为[[TableFunction]]可以通过实施自定义评估*方法来定义.评估方法必须公开声明,而不是静态的,并命名为“eval”。*评估方法也可以通过实现多个名为“eval”的方法来重载。**用户定义的函数必须具有默认构造函数,并且必须在运行时实例化。**默认情况下,评估方法的结果类型由Flink的类型提取*设施确定。这对于基本的POJ类型来说已经足够了htbewrongform更多*复杂、自定义或复合类型s.Inthesecases[[TypeInformation]]ofthersulttype*canbemanuallydefinedbyoverriding[[getResultType()]].*/abstractclassTableFunction[T]extendsUserDefinedFunction{//对于泛型T,如果是基本类型,那么Flink框架可以自动识别,//对于用户自己定义的复杂对象,需要用户覆盖这个实现defgetResultType:TypeInformation[T]=null}上面定义的核心是要求用户实现eval方法。写一个具体的例子吧。例//定义一个简单的UDTF返回类型,对应接口上的TcaseclassSimpleUser(name:String,age:Int)//继承TableFunction,并实现evale方法//核心功能是解析#classSplitTVFextendsTableFunction[分隔的字符串SimpleUser]{//makesureinputelement'sformatis"#"defeval(user:String):Unit={if(user.contains("#")){valsplits=user.split("#")collect(SimpleUser(splits(0),splits(1).toInt))}}}(2)示例(完整的ITCase):测试数据我们构造一张用户表,只包含一个数据字段,用户表数据如下:查询需求query需求是将数据字段压平成一张表,有name和age两个字段,期望得到:查询示例我们使用ITCase来完成上面的查询需求。完整代码如下:@TestdeftestLateralTVF():Unit={valenv=StreamExecutionEnvironment.getExecutionEnvironmentvaltEnv=TableEnvironment。getTableEnvironment(env)env.setStateBackend(getStateBackend)StreamITCase.clearvaluserData=newmutable.MutableList[(String)]userData.+=(("Sunny#8"))userData.+=(("Kevin#36"))userData。+=(("Panpan#36"))valSQLQuery="SELECTdata,name,ageFROMUserTab,LATERALTABLE(splitTVF(data))AST(name,age)"valusers=env.fromCollection(userData).toTable(tEnv,'data)valtvf=newSplitTVF()tEnv.registerTable("userTab",users)tEnv.registerFunction("splitTVF",tvf)valresult=tEnv.SQLQuery(SQLQuery).toAppendStream[Row]result.addSink(newStreamITCase.StringSink[Row])env.execute()StreamITCase.testResults.foreach(println(_))}运行结果:上面核心语句为:valSQLQuery="SELECTdata,name,ageFROMUserTab,LATERALTABLE(splitTVF(data))AST(name,age)》如果想运行上面的例子,请参考中的源码搭建测试环境6.总结本文重点介绍一种新的JOIN类型-加入横向。并介绍了SQLServer对LATERAL的支持方式,详细分析了JOINLATERAL和INNERJOIN的区别和联系,最后切入ApacheFlink,以UDTF示例说明了ApacheFlink对JOINLATERAL的支持,后续章节会另行介绍在ApacheFlink中使用LATERAL的场景,即TemporalJION。TemporalJION也是一种新型的JOIN。下篇文章见!关于点赞和评论本系列文章难免存在诸多缺陷和不足。衷心希望读者对富有成果的章节给予表扬和鼓励,对不足的章节给予反馈和建议。先感谢您!作者:孙金城,昵称金珠,目前就职于阿里巴巴。ApacheFlink的阿里巴巴计算平台Blink的设计与开发。【本文为专栏作家“金竹”原创稿件,转载请联系原作者】点此阅读更多该作者好文