介绍Snowpark是Snowflake的一个新的开发库,它提供了一个API,允许用户使用像Scala(以及未来的Java和Python)这样的编程语言来代替SQL进行数据处理。数据处理。Snowpark的核心概念是DataFrame(数据框),它代表一组数据,比如一些数据库表的行,我们可以使用自己喜欢的工具通过面向对象或者函数式编程来处理。SnowparkDataFrames的概念类似于ApacheSpark中的DataFrames或者Python中的Pandas包中DataFrames的意思,是一种表格数据结构。开发者也可以创建自定义函数推送到Snowflake服务器,更方便的处理数据。Snowpark的代码执行采用惰性计算,减少了从Snowpark仓库到客户端的数据流。当前版本的Snowpark可以在Scala2.12和JDK8、9、10或11上运行。它现在处于公开预览阶段,可供所有帐户使用。架构特点从架构的角度来看,Snowpark客户端类似于ApacheSparkDriver程序。它在客户端执行用户编写的代码,并将其转换成SQL语句推送到Snowpark数据仓库。Snowpark计算服务器处理完数据后,接收DataFrame格式的返回结果。从广义上讲,Snowpark数据仓库操作可以分为两类:转换和执行。由于转换是延迟执行的,因此它们不会触发DataFrames数据的计算处理。select(查询)、filter(过滤)、sort(排序)、groupBy(分组)等操作都属于转换的范畴。执行恰恰相反,它们触发对DataFrames数据的计算。Snowpark将DataFrame数据的SQL语句发送给服务端进行计算,然后将结果返回给客户端内存。show、collect、take等都是执行操作。Snowpark执行在我们执行任何Snowpark转换和执行之前,我们需要首先连接到Snowpark数据仓库并建立会话。ScalaobjectMain{defmain(args:Array[String]):Unit={//替换下面的。valconfigs=Map("URL"->"https://.snowflakecomputing.com:443","USER"->"","PASSWORD"->"","ROLE"->"SYSADMIN","WAREHOUSE"->"SALESFORCE_ACCOUNT","DB"->"SALESFORCE_DB","SCHEMA"->"SALESFORCE")valsession=Session.builder.configs(configs).createsession.sql("showtables").show()}}从Snowpark管理页面,我们有一个SALESFORCE_DB数据库和一个包含3个表的SALESFORCE:SALESFORCE_ACCOUNT表代表来自Salesforce实例的帐户,SALESFORCE_ORDER表存储由以下人员发起的订单这些帐户,而SALESFORCE_ACCOUNT_ORDER是一个关联表,用于存储关联查询的结果(我们将在本文后面讨论这个)进一步讨论)。要检索Salesforce_Account表的前10行,我们可以简单地执行以下DataFrame方法:Scala//从“salesforce_account”表中的数据创建一个DataFrame。valdfAccount=session.table("salesforce_account")//要打印出前10行,请调用:dfAccount.show()Snowpark会将代码转换为SQL语句并交给Snowflake执行:Scala[main]INFOcom.snowflake.snowpark.internal.ServerConnection-执行查询[queryID:XXXX]SELECT*FROM(SELECT*FROM(salesforce_account))LIMIT10的输出在我们的VSCodeIDE中看起来像这样:我们还可以过滤某些行并执行转换DataFrame的(例如选择指定的列):ScalavaldfFilter=session.table("salesforce_account").filter(col("type")==="Customer-Direct")table("salesforce_account").select(col("accountname"),col("phone"))dfSelect.show()Snowpark会生成相应的SQL查询,交给Snowflake计算服务器执行:[main]INFOcom.snowflake.snowpark.internal.ServerConnection-执行查询[queryID:XXXX]SELECT*FROM(SELECT*FROM(SELECT*来自(salesforce_account))WHERE("TYPE"='Customer-Direct'))LIMIT10[main]INFOcom.snowflake.snowpark.internal.ServerConnection-执行查询[queryID:XXXX]SELECT*FROM(SELECT"ACCOUNTNAME","PHONE"FROM(SELECT*FROM(salesforce_account)))LIMIT10下面是VSCode中的输出:SnowparkDataFrameAPI也允许DataFrames数据之间的拼接和关联。在这个例子中,我们有SALESFORCE_ORDER表,它记录了Salesforce账户产生的账单数据。我们可以将这些Pull数据合并到DataFrame中并将它们与帐户记录结合起来:"accountid")).select(col("accountname"),col("phone"),col("productname"),col("amount"))dfJoin.show()Snowflake将DataFrame方法转为SQL语句,然后推送到Snowflake数据仓库进行计算。VSCode中的输出如下:如果我们想持久化计算结果,可以使用saveAsTable方法:ScaladfJoin.write.mode(SaveMode.Overwrite).saveAsTable("salesforce_account_order")生成的SQL语句如下所示:Scala[main]INFOcom.snowflake.snowpark.internal.ServerConnection-执行查询[queryID:XXXX]创建或替换表salesforce_account_orderASSELECT*FROM(SELECT"ACCOUNTNAME","PHONE","PRODUCTNAME","AMOUNT"FROM(SELECT*FROM((SELECT"ACCOUNTNAME"AS"ACCOUNTNAME","PHONE"AS"PHONE","TYPE"AS"TYPE","SFDCID"AS"SFDCID"FROM(SELECT*FROM(salesforce_account)))ASSNOWPARK_TEMP_TABLE_UKKLR6UCHN6POXLINNERJOIN(SELECT"ACCOUNTID"AS"ACCOUNTID","PRODUCTNAME"AS"PRODUCTNAME","AMOUNT"AS"AMOUNT"FROM(SELECT*FROM(salesforce_order)))ASSNOWPARK_TEMP_TABLE_36DEOZXTQJUYKLDON("SFDCID"="ACCOUNTID"))))然后,Snowpark将创建一个新表或替换现有的旧表来存储生成的数据:结论Snowpark提供了丰富的数据处理操作和工具。它允许用户创建非常复杂的高级数据处理管道操作。将用户自定义代码推送到Snowflake数仓服务器,并在数据端执行,减少不必要的数据传输,是Snowpark非常强大的功能。译者介绍卢新望,51CTO社区编辑,90后半途出家程序员。做过前端页面,写过业务接口,做过爬虫,学过JS,有幸接触到Golang,参与过微服务架构的改造。目前主要编写Java,负责公司可定制低代码平台数据引擎层的设计和开发。原标题:SnowflakeDataProcessingWithSnowparkDataFrames,作者:IstvanSzegedi