当前位置: 首页 > 科技观察

使用SparkStreaming转换不同的JSONpayload

时间:2023-03-13 04:30:40 科技观察

【.com速译】SparkStreaming是基于SparkCore的大数据实时计算底层框架,能够以流的方式从源头读取数据。我们只需要从数据源创建一个读取流,然后我们就可以创建一个写入流来将数据加载到目标数据源中。对于以下演示,我们将假设我们有不同的JSON有效负载进入一个kafka主题,我们需要将其转换并写入另一个kafka主题。创建一个ReadStream以持续接收JSON负载作为消息。我们需要先读取消息并使用spark的readstream创建一个数据框。Spark中提供了readStream函数,我们可以利用这个函数基本创建一个readStream。这将从kafka主题中读取流式有效负载。valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers","host1:port1,host2:port2").option("subscribe","topic1").load()我们可以创建一个案例类(例如CustomerUnion),它将包含JSON有效负载的所有可能字段。这样,我们就可以在数据框上运行选择查询而不会失败。valrawDfValue=rawData.selectExpr("CAST(valueASSTRING)").as[String]valschema=ScalaReflection.schemaFor[CustomerUnion].dataType.asInstanceOf[StructType]valextractedDFWithSchema=rawDfValue.select(from_json(col("值"),schema).as("data")).select("data.*")extractedDFWithSchema.createOrReplaceTempView("tempView")这将为我们提供一个数据帧提取的DFWithSchema,其中列作为有效负载字段。示例输入有效载荷这里有两个示例输入有效载荷,但可以有更多的有效载荷,有些字段不存在(变量)。{"id":1234,"firstName":"Jon","lastName":"Butler","City":"Newyork","Email":abc@gmail.com,"Phone":"2323123"}{“firstName”:”Jon”,“lastName”:”Butler”,“City”:”Newyork”,“Email”:abc@gmail.com,“Phone”:”2323123”}根据id字段输出payload示例,我们将决定输出有效载荷。如果存在id字段,我们将其视为用户更新案例,并且仅在输出负载中发送“Email”和“Phone”。我们可以根据特定条件配置任何字段。这只是一个例子。如果id不存在,我们将发送所有字段。以下是输出负载的两个示例:{"userid":1234,"Email":abc@gmail.com,"Phone":"2323123"}{"fullname":"JonButler","City":"Newyork","Email":abc@gmail.com,"Phone":"2323123"}启动WriteStreams一旦我们有了数据框,我们就可以运行任意数量的sql查询,并根据所需的负载主题写入kafka。所以我们可以创建一个包含所有sql查询的列表并遍历该列表并调用writeStream函数。让我们假设,我们有一个名为queryList的列表,它只包含字符串(即sql查询)。下面是为写流定义的函数:defstartWriteStream(query:String):Unit={valtransformedDf=spark.sql(query)transformedDf.selectExpr("CAST(keyASSTRING)","CAST(valueASSTRING)").writeStream.format("kafka").option("kafka.bootstrap.servers","host1:port1,host2:port2").option("topic","topic1").start()}这个将为列表中的每个查询启动写入流。queryList.foreach(startWriteStream)spark.streams.awaitAnyTermination()如果我们知道输入负载的所有可能字段,即使某些字段不存在,我们的sql查询也不会失败。我们已将有效负载的模式指定为案例类,它将创建一个数据帧,为缺失的字段指定NULL。通过这种方式,我们可以使用spark-streaming在所需的转换/过滤器??之后将多个有效负载从同一主题写入不同的主题。【翻译稿件,合作网站转载请注明原译者和出处.com】