当前位置: 首页 > 科技观察

面试必知的Spark SQL几种Join实现

时间:2023-03-19 02:16:11 科技观察

面试中必须知道的几种SparkSQL的Join实现开发的主流,作为开发者,我们需要了解Join在Spark中是如何组织和运行的。SparkSQL整体流程介绍在讲解Join的实现之前,我们先简单介绍一下SparkSQL的整体流程。一般来说,我们有两种方式来使用SparkSQL。一种是直接写SQL语句,需要元数据库支持,比如Hive等。一种是通过Dataset/DataFrame来写Spark应用。如下图,sql语句被语法(SQLAST)解析成一个查询计划,或者我们通过Dataset/DataFrame提供的API组织成一个查询计划。查询计划分为两类:逻辑计划和物理计划。这个阶段通常称为逻辑计划优化,经过语法分析(Analyzer)和一系列查询优化(Optimizer)。最后映射成物理计划,转换成RDD执行。对于句法分析、句法分析、查询优化,本文不做详述。本文重点介绍Join的物理执行过程。Join的基本元素如下图所示。Join大致包括三个要素:Join方法、Join条件、过滤条件。过滤条件也可以通过AND语句放在Join条件中。Spark支持所有类型的Join,包括:innerjoinleftouterjoinrightouterjoinfullouterjoinleftsemijoinleftantijoin下面分别介绍这几种Join的实现。Join的基本实现流程总的来说,Join的基本实现流程如下图所示。Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter)。通常,streamIter是大表,buildIter是小表,我们不用纠结哪个表是streamIter,哪个表是buildIter。这个spark会根据join语句自动帮我们完成。在实际计算中,spark会根据streamIter进行遍历,每次在streamIter中取出一条记录rowA,根据Join条件计算出keyA,然后去buildIter中寻找所有满足Join条件的记录rowB(keyB==keyA)根据keyA,将rowBs中的每条记录与rowA进行join,得到joined记录,最后根据过滤条件得到最终的joined记录。从上面的计算过程不难发现,对于streamIter中的每条记录,都需要在buildIter中搜索匹配的记录,所以buildIter一定是搜索性能较好的数据结构。Spark提供了三种join实现:sortmergejoin、broadcastjoin和hashjoin。sortmergejoin的实现是让两条记录可以join在一起。首先,具有相同键的记录需要放在同一个分区中。所以,一般来说,需要shuffle。映射阶段根据连接条件确定每条记录的键。key用于shufflewrite,将可能会join在一起的记录分到同一个partition,这样在shuffleread阶段可以将两张表中key相同的记录拉到同一个partition处理。前面我们也提到,对于buildIter来说,需要找到一个性能更好的数据结构。通常,我们可以想到哈希表,但是对于一个大表来说,不可能把所有的记录都放在哈希表中。可以先对buildIter进行排序,查找时按顺序查找。搜索成本也是可以接受的。我们知道sparkshufflestage天生就支持排序。这很容易实现。下面是sortmergejoin的示意图。在shuffleread阶段,分别对streamIter和buildIter进行归并排序。在遍历streamIter时,对于每条记录,依次从buildIter中查找对应的记录。由于两张表是排序的,每次处理完streamIter的一条记录后,对于streamIter的下一条记录,只需要从buildIter中上次查找结束的位置开始查找即可,所以buildIter中的每次查找都不会需要从头说起,总体来说,搜索性能还是比较好的。broadcastjoin的实现允许具有相同key的记录分配到同一个partition。我们通常会洗牌。如果buildIter是一个非常小的表,那么就没有必要去war做shuffle。BuildIter直接广播给各个计算节点,然后将buildIter放入哈希表中,如下图所示。从上图可以看出,不做shuffle,直接在一个map里面就可以了。通常,这种连接也称为映射连接。那么问题来了,什么时候使用broadcastjoin呢?这个我们不用管,sparksql会自动帮我们完成。当buildIter的预估大小不超过参数spark.sql.autoBroadcastJoinThreshold设置的值(默认10M)时,会自动使用broadcastjoin,否??则使用sortmergejoin。Hashjoin的实现除了上面两种join的实现方法,spark还提供了hashjoin的实现方法。在随机读取阶段不会对记录进行排序。反正两张表key相同的记录会在同一个分区,但是分区内不排序,把buildIter的记录放到hash表中,方便查找,如下图。不难发现,如果要把buildIter的记录放到哈希表中,那么每个分区的buildIter的记录不能太大,否则会存不下。默认情况下,散列连接的实现是关闭的。如果要使用hashjoin,必须满足以下四个条件:buildIter的整体预估大小超过了spark.sql.autoBroadcastJoinThreshold设置的值,即不满足broadcastjoin条件打开开关尝试使用hashjoin,spark.sql.join.preferSortMergeJoin=false每个partition的平均值大小不超过spark.sql.autoBroadcastJoinThreshold设置的值,即shuffleread阶段从buildIter对每个partition的records必须能够存储在内存中。streamIter的大小是buildIter的三倍多。所以,使用hashjoin的条件其实是非常苛刻的。是的,在大多数实际场景中,即使可以使用hashjoin,使用sortmergejoin也不会比hashjoin差多少,所以尽量使用hash。下面分别讲解不同join方式的实现过程。内连接内连接就是在左右表中查找满足连接条件的记录。我们在写SQL或者使用DataFrame的时候,不需要关心哪个是左表哪个是右表。在sparksql查询优化阶段,spark会自动将大表设置为左表,即streamIter,将小表设置为右表,即buildIter。这样对于小表的搜索相对来说比较好。其基本实现流程如下图所示。在查找阶段,如果右表中没有满足join条件的记录,则跳过。左外连接左外连接是以左表为基础,在右表中查找匹配的记录。如果搜索失败,则返回一条所有字段都为空的记录。我们在写sql语句或者使用DataFormae的时候,一般都是把大表放在左边,把小表放在右边。其基本实现流程如下图所示。右外连接右外连接以右表为基础,在左表中查找匹配的记录。如果搜索失败,则返回一条所有字段都为空的记录。所以右表是streamIter,左表是buildIter。我们在写SQL语句或者使用DataFrame的时候,一般都是让大表在右,小表在左。其基本实现流程如下图所示。fullouterjoinFullouterjoin相对来说要复杂一些。一般来说,leftouterjoin和rightouterjoin都需要做,但是不能简单的先leftouterjoin,再rightouterjoin,最后union得到最后的结果。因为最后的结果里面有两个innerjoin的结果。由于需要完成leftouterjoin和rightouterjoin,所以fullouterjoin只能通过sortmergejoin来实现,而且左右表必须同时作为streamIter和buildIter。基本实现流程如下图所示。由于左表和右表已经排序,先在左表和右表中依次取出一条记录,比较key,如果key相等,joinrowA和rowB,更新rowA和rowB为左表和右表分别如果keyAkeyB,说明左表中没有右表rowB对应的记录,则joinnullRow和rowB,然后将rowB更新为右表的下一条记录。如此循环遍历,直到处理完左右表中的所有记录。左半连接左半连接是以左表为基础,在右表中查找匹配的记录。如果查找成功,则只返回左边的记录,否则返回null。基本实现流程如下图所示。左反连接左反连接与左半连接相反。它以左表为基础,在右表中查找匹配的记录。如果查找成功则返回null,否则只返回左边的记录。基本实现流程如下图所示。总结Join是数据库查询中一个非常重要的语法特征。在数据库领域,可谓“得join者得天下”。SparkSQL作为分布式数据仓库系统,为我们提供了完善的join支持,并在内部实现。网上默默做了很多优化,了解join的实现,有助于我们更深入的了解我们应用的运行轨迹。