ApacheKafka和SparkStreaming的两种集成方式及其优缺点配置SparkStreaming,从Kafka接收数据。第一种是使用sinks和Kafka的高级API;而第二种新方法不使用接收器。这两种方法在性能特征和语义保存方面具有不同的编程模型。让我们详细探讨这两种方法。1.Receiver-based方法这种方法使用一个接收器(Receiver)来接收数据。接收器是使用Kafka的高级消费者API实现的。另外,接收到的数据会存储在Spark的各个executor中。然后数据由SparkStreaming启动的作业处理。但是这种方法的默认配置可能会在发生故障时丢失数据。因此,我们必须在SparkStreaming中额外启用write-aheadlog,以保证数据零丢失。它将所有接收到的Kafka数据同步保存到分布式文件系统中的预写日志中,以便在发生故障时可以恢复所有数据。下面,我们将讨论如何在Kafka-SparkStreaming应用程序中使用这种基于接收器的方法。1.链接现在,首先将您的KafkaStreaming应用程序与以下工件链接起来。对于Scala和Java类型的应用程序,我们将使用SBT(简单构建工具)和Maven(一种构建工具)的各种项目定义。groupId=org.apache.sparkartifactId=spark-streaming-kafka-0-8_2.11version=2.2.0对于Python类型的应用,我们在部署自己的应用时必须添加上面的库以及它们的各种依赖。2.编程随后,我们通过在流式应用程序的代码中导入KafkaUtils来创建一个DStream输入:topicnumberofKafkapartitionstoconsume])同样,通过createStream的各种变形方式,我们可以制定出不同的key/value类及其对应的解码类。3.部署通常,对于任何一个Spark应用,你都可以使用spark-submit来发布你自己的应用。当然,就具体的Scala、Java和Python应用而言,它们在细节上会略有不同。其中,由于Python应用缺少SBT和Maven的项目管理,我们可以使用--packagesspark-streaming-kafka-0-8_2.11及其依赖直接添加到spark-submit。./bin/spark-submit--packagesorg.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0...另外,我们也可以下载Maven神器的spark-streaming对应的jar包-Kafka-0-8-assembly,然后使用-jars将其添加到spark-submit。2.直接法(无接收者)在基于接收者的方法之后,一种新的无接收者的“直接”方法诞生了。这种方法提供了更强大的端到端保证。它周期性地向Kafka查询每个topic+partition(分区)中的最新offset,而不是使用receiver接收数据。同时,它还定义了每批次要处理的不同偏移量范围。特别是,当处理数据的作业启动时,一个简单的消费者API用于从Kafka中预定义的偏移范围读取数据。可以看出,这个过程类似于从文件系统中读取各种文件。注意:对于Scala和JavaAPI,Spark在其1.3版本中引入了此功能;对于PythonAPI,它也在其1.4版本中引入了此功能。下面,我们将讨论如何在Streaming应用中使用该方法,并详细了解consumerAPI的链接:1.Link当然,该方法只有Scala和Java应用支持,通过以下神器和Maven链接STB项目。groupId=org.apache.sparkartifactId=spark-streaming-kafka-0-8_2.11version=2.2.02。编程之后,我们通过在Streaming应用程序代码中导入KafkaUtils创建一个DStream输入:importorg.apache。spark.streaming.kafka._valdirectKafkaStream=KafkaUtils.createDirectStream[[keyclass],[valueclass],[keydecoderclass],[valuedecoderclass]](streamingContext,[mapofKafkaparameters],[setoftopicstoconsume])我们必须在Kafka的参数中指定元数据。broker.list或bootstrap.servers以便它可以默认从每个Kafka分区的最新偏移量开始消费。当然,如果你配置auto.offset.reset为Kafka参数中最小的,那么它会从最小的offset开始消费。此外,通过使用KafkaUtils.createDirectStream的各种变体,我们可以从任何偏移量开始消费。当然,我们也可以按如下方式在每批次中消费Kafka偏移量。//持有当前偏移范围的引用,所以下游可以使用它varoffsetRanges=Array.empty[OffsetRange]directKafkaStream.transform{rdd=>offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}.map{...}.foreachRDD{rdd=>for(o<-offsetRanges){println(s"${o.topic}${o.partition}${o.fromOffset}${o.untilOffset}")}...}如果要使用基于Zookeeper的Kafka监控工具(https://data-flair.training/blogs/zookeeper-in-kafka/)来显示Streaming应用的进度,那你也可以自己更新到Zookeeper。3.部署这方面的部署过程与基于receiver的方法类似,这里不再赘述。3、直接法的优点在SparkStreaming和Kafka的融合方面,第二种方法比第一种方法有以下优点:1.简化并行,无需创建和合并多个输入的KafkaStreams(https://data-flair.training/blogs/kafka-streams/)。但是SparkingStreaming会创建尽可能多的RDD(ResilientDistributedDatasets,弹性分布式数据集)分区,供多个Kafka分区使用直接的方式消费。这些分区还并行地从Kafka读取数据。所以我们可以说:Kafka和RDD分区之间是一对一的映射关系,更容易理解和调整。2.效率为了实现数据零丢失,第一种方法需要将数据存储在预先写入的日志中,以便进一步的数据复制。这种方式效率其实比较低,因为数据实际上是被Kafka和write-aheadlog复制了两次。direct的方式,由于没有sink,所以不需要预写log,解决了这个问题。只要你有足够的Kafka数据保留,就可以从Kafka恢复各种消息。3.准确的语义第一种方法,我们使用Kafka的高层API将消耗的偏移量存储在Zookeeper中。但是,这种传统的从Kafka消费数据的方式虽然可以保证数据零丢失,但是在一些失败的情况下,小概率可能会消费两次数据。事实上,这种情况是由于SparkStreaming可靠接收的数据与Zookeeper跟踪的偏移量不一致造成的。所以在第二种方法中,我们不使用Zookeeper,而是使用一个简单的KafkaAPI。SparkStreaming通过其各种检查点跟踪不同的偏移量,从而消除SparkStreaming和Zookeeper之间的不一致。由此可见,即使出现故障,那些记录也会被SparkStreaming一次性有效准确的接收到。它可以保证我们的输出操作,即:在将数据保存到外部数据仓库时,各种保存结果和偏移量的幂等性和原子事务性,这也有助于实现准确的语义。但是这种方式也有一个缺点:由于不更新Zookeeper中的各种offset,那些基于Zookeeper的Kafka监控工具将无法显示进度。当然你也可以自己访问每个batch中这个方法处理过的offset,更新到Zookeeper中。结论通过上面的讨论,我们了解了Kafka和SparkStreaming集成的整体概念。同时,我们也讨论了Kafka-SparkStreaming两种不同的配置方式:sink方式和direct方式,以及direct方式的几个优点。原标题:ApacheKafka+SparkStreamingIntegration,作者:RinuGour
