当前位置: 首页 > 科技观察

SparkSQLFieldLineageinvivoInternet实践

时间:2023-03-13 13:27:22 科技观察

作者:vivo互联网服务器团队-郝光石1.背景Fieldlineage是在表处理过程中保存字段处理过程。为什么需要田间血缘关系?有了字段之间的血缘关系,就可以知道数据的来源和字段之间的转换关系,对数据质量和治理有很大的帮助。与Hive相比,SparkSQL的效率普遍更高,在运行时间和资源占用上都会有更大的收益。平台计划将Hive任务迁移到SparkSQL,还需要实现字段血统功能。2、前期研究我们在开发之前做了很多相关的研究,从中了解到Spark支持扩展:允许用户扩展SparkSQL的SQL分析、逻辑计划的分析检查、逻辑计划的优化、物理计划的形成等..这个方案是可行的,而且没有改动Spark的源码,成本也比较小,所以确定使用这个方案。3.SparkSQL扩展3.1Spark可扩展内容SparkSessionExtensions是一个比较重要的类,它定义了注入规则的方法,目前支持以下内容:[AnalyzerRules]逻辑计划分析规则[CheckAnalysisRules]逻辑计划检查规则[OptimizerRules.]Logicalplanningoptimizationrules[PlanningStrategies]形成物理计划的策略[CustomizedParser]Customsqlparser[(External)Cataloglistenerscatalog]Listeners可以在上面六个地方自定义,我们选择[CheckAnalysisRules]。因为校验规则在调用方法时不需要有返回值,也就是说不需要修改当前遍历的逻辑计划树,这正是我们所需要的。但是【AnalyzerRules】和【OptimizerRules】需要修改当前的逻辑计划,导致我们很难迭代整棵树,得到我们想要的结果。3.2实现自己的扩展类ExtraSparkExtensionextends(SparkSessionExtensions=>Unit){overridedefapply(spark:SparkSessionExtensions):Unit={//fieldlineagespark.injectCheckRule(FieldLineageCheckRuleV3)//sqlparserspark.injectParser{case(_,parser)=>newExtraSparkParser(parser)}}}上面是这样实现扩展的,在apply方法中将你需要的规则注入到SparkSessionExtensions中。除了以上四种可以注入的类型外,还有其他的规则。为了使ExtraSparkExtension生效,我们需要在spark-default.conf中配置spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension,使其在Spark任务启动时生效。请注意,我们还实现了一个自定义SQL解析器,它并没有做太多事情。判断语句是否包含insert时,只需将SQLText(SQL语句)设置为FIELD_LINE_AGE_SQL即可。SQLText之所以被放入FIELD_LINE_AGE_SQL。因为在DheckRule中获取不到SparkPlan,需要重新分析SQL获取SparkPlan,而FieldLineageCheckRuleV3的实现也很简单,关键是在另外一个线程实现。这里我们只关注insert语句,因为insert语句包含了某些表的输入,然后写入到某个表中。类ExtraSparkParser(delegate:ParserInterface)extendsParserInterfacewithLogging{overridedefparsePlan(sqlText:String):LogicalPlan={vallineAgeEnabled=SparkSession.getActiveSession.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBooleanlogDebug(s"SqlText:$sqlText")if(sqlText.toLowerCase().contains("insert")){if(lineAgeEnabled){if(FIELD_LINE_AGE_SQL_COULD_SET.get()){//线路本变量在这里FIELD_LINE_AGE_SQL.set(sqlText)}FIELD_LINE_AGE_SQL_COULD_SET.remove()}}delegate.parsePlan(sqlText)}//调用原始的sqlparseroverridedefparseExpression(sqlText:String):Expression={delegate.parseExpression(sqlText)}//调用原始的sqlparseroverridedefparseTableIdentifier(sqlText:String):TableIdentifier={delegate.parseTableIdentifier(sqlText)}//调用原始的sqlparseroverridedefparseFunctionIdentifier(sqlText:String):FunctionIdentifier={delegate.parseFunctionIdentifier(sqlText)}//调用原来的sqlparseroverridedefparseTableSchema(sqlText:String):StructType={delegate.parseTableSchema(sqlText)}//调用原来的sqlparseroverridedefparseDataType(sqlText:String):DataType={delegate.parseDataType(sqlText)}}3.3扩展规则类caseclassFieldLineageCheckRuleV3(sparkSession:SparkSession)extends(LogicalPlan=>Unit){valexecutor:ThreadPoolExecutor=ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)overridedefapply(plan:LogicalPlan):Unit={valsql=FIELD_LINE_AGE_SQL.getFIELD_LINE_AGE_SQL.remove()if(sql!=null){//这里我们获取sql并启动一个线程来做剩下的分析taskvaltask=newFieldLineageRunnableV3(sparkSession,sql)executor.execute(task)}}}很简单,我们只是获取了SQL,然后开启一个线程获取SparkPlan,实际逻辑在FieldLineageRunnableV33.4具体实现方法3.4.1获取SparkPlan我们在run方法中获取SparkPlan:优化器valplanner=sparkSession.execute(analyzedPlan)//获取sparkPlanvalsparkPlan=planner.plan(optimizerPlan).next()................if(targetTable!=null){vallevelProject=newArrayBuffer[ArrayBuffer[NameExpressionHolder]]()valpredicates=newArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()//投影projectionLineAge(levelProject,sparkPlan.child)//预测predicationLineAge(predicates,sparkPlan.child).........为什么要用SparkPlan?我们最初考虑的时候,物理规划在走场关系的时候更准确,链接更短更直接。这里补充一下SparkSQL的解析过程如下:经过SqlParser,会得到逻辑计划。此时表名、函数等还没有被解析,无法执行;之后Analyzer会分析一些绑定信息,比如tablevalidation,field信息,Function信息;经过Optimizer之后,逻辑计划会按照既定的规则进行优化。这里的规则是RBO。当然Spark也支持CBO优化;SparkPlanner之后,就变成了一个可执行的物理计划。下面看一个逻辑计划和物理计划对比的例子:一条SQL语句:selectitem_id,TYPE,v_value,imeifromt1unionallselectitem_id,TYPE,v_value,imeifromt2unionallselectitem_id,TYPE,v_value,imeifromt3Thelogicalplanislikethe:physicalplanisthis:显然简化了很多。得到SparkPlan后,我们可以根据不同的SparkPlan节点进行迭代。我们将字段沿袭分为两种类型:projection(选择查询字段),predication(wehre查询条件)。这两个是点对点的关系,即原表的字段和目标表的字段之间产生对应关系。想象一个查询是一棵树,那么迭代关系会从树的顶端迭代到树的叶子节点,叶子节点就是原来的表:那么我们迭代查询的结果应该是id->tab1。id,name->tab1.name,tabb2.name,age→tabb2.age。注意有这个变量vallevelProject=newArrayBuffer[ArrayBuffer[NameExpressionHolder]](),通过projecti-onLineAge迭代后,levelProject存储的是(tab1.id),(tab1.名称,tabb2.name),(tabb2.age)。当然不是简单的递归迭代,需要考虑特殊情况,比如:Join、ExpandExec、Aggregate、Explode、GenerateExec等都需要特殊考虑。示例及效果:SQL:withAas(selectid,name,agefromtab1whereid>100),Cas(selectid,name,max(age)fromAgroupbyA.id,A.name),Bas(selectid,name,agefromtabb2whereage>28)insertintotab3selectC.id,concat(C.name,B.name)asname,B.agefromB,C其中C.id=B.id效果:{"edges":[{"sources":[3],"targets":[0],"expression":"id","edgeType":"PROJECTION"},{"sources":[4,7],"targets":[1],"expression":"name","edgeType":"PROJECTION"},{"sources":[5],"targets":[2],"expression":"age","edgeType":"PROJECTION"},{"sources":[6,3],"targets":[0,1,2],"expression":"INNER","edgeType":"预测"},{"来源":[6,5],"目标":[0,1,2],"expression":"((((default.tabb2.`age`ISNOTNULL)AND(CAST(default.tabb2.`age`ASINT)>28))AND(B.`id`>100))AND(B.`id`ISNOTNULL))","edgeType":"PREDICATE"},{"sources":[3],"targets":[0,1,2],"expression":"((default.tab1.`id`ISNOTNULL)AND(default.tab1.`id`>100))","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"default.tab3.id"},{"id":1,"vertexType":"COLUMN","vertexId":"default.tab3.name"},{"id":2,"vertexType":"COLUMN","vertexId":"default.tab3.age"},{"id":3,"vertexType":"COLUMN","vertexId":“default.tab1.id”},{“id”:4,“vertexType”:“COLUMN”,“vertexId”:“default.tab1.name”},{“id”:5,“vertexType”:“柱子”,“vertexId”:“default.tabb2.age”},{“id”:6,“vertexType”:“COLUMN”,“vertexId”:“default.tabb2.id”},{“id”:7,“vertexType":"COLUMN","vertexId":"default.tabb2.name"}]}4.小结在SparkSQL实现字段血缘关系的时候,我们首先通过自身扩展得到插入语句,在我们自己的检查规则中获取SQL语句进去,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终得到物理计划。我们迭代物理计划,根据不同的执行计划做相应的转换,然后得到字段之间的对应关系。目前的实现比较简单。字段之间是直线对应关系,忽略中间过程。如果要实现字段转换的全过程,是没有问题的。

猜你喜欢