译者|陈军点评|孙淑娟众所周知,作为一个事件流平台,Kafka可以松散地驻留在面向消息的中间件(Message-orientedMiddleware,MoM)空间。Akka全称为Actor模型,是一种基于响应、容错、消息传递的同步计算过程。下面,我将和大家一起探讨一下分布式编程工具AkkaStreams、KafkaStreams和SparkStreaming的主要特点和优缺点,以及如何在一个简单的字数统计应用中使用它们。在这篇文章中,我主要使用Scala来写代码,涉及到的框架都有JavaAPI。1.KafkaStreamsKafkaSteams是一个可以处理数据的客户端库。这里的客户端库意味着我们正在编写一个应用程序,该应用程序使用另一个基础设施(在本例中为Kafka集群)提供的服务。因此,我们需要与集群交互来处理连续的数据流。数据需要表示为键值记录以便于识别,并以主题的形式组织成持久化的事件日志。它们本质上是被复制并写入磁盘的持久数据队列。在这种架构中,生产者(producer)应用程序将记录推送到主题中(例如,一家电子商务公司需要跟踪订单的每一步);而多个消费者(consumer)应用程序需要以各种方式读取主题。不同时间点的数据。这类数据结构的架构不仅具有高度的分布式和可扩展性,而且具有一定的容错能力。由于嵌入了exact-once消息语义,Kafka可以保证每条发送的记录都能到达集群,并且只写一次,不会重复。正是因为在一般的分布式系统中实现起来极其困难,所以Kafka的这个特性显得非常重要。从Kafka的组织方式来看,其API允许Java或Scala应用程序与Kafka集群交互,同时与其他应用程序并行且独立使用。这种独立性使分布式和可扩展的服务能够在大型应用程序中独立使用微服务。KafkaSteamsScalaobjectWordCountApplication扩展App{importSerdes._valprops:Properties={valp=newProperties()p.put(StreamsConfig.APPLICATION_ID_CONFIG,"myFabulousWordCount")p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"my-kafka-broker-url:9092")p}valbuilder:StreamsBuilder=newStreamsBuildervaltextLines:KStream[String,String]=builder.stream[String,String]("TextLinesTopic")valwordCounts:KTable[String,Long]=textLines.flatMapValues(textLine=>textLine.toLowerCase.split("\\W+")).groupBy((_,word)=>word).count()(Materialized.as("word-counts-table"))wordCounts.toStream.to("WordsWithCountsTopic")valstreams:KafkaStreams=newKafkaStreams(builder.build(),props)streams.start()sys.ShutdownHookThread{streams.close(10,TimeUnit.SECONDS)}}上面的代码是字数统计应用程序的KafkaSteam表示。显然这段代码比较“重”,我试着分解了一下。ScalaimportSerdes._Kafka为了性能进行二进制记录存储,也就是我们常说的序列化和反序列化。通过上面的语句,我们就可以在Scala中实现序列化和反序列化(并行转换器)的自动导入。Scalaval道具:Properties={valp=newProperties().p.put(StreamsConfig.APPLICATION_ID_CONFIG,"myFabulousWordCount")p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"my-kafka-broker-url:9092")p}在上面应用代码的第一部分需要配置要连接的Kafka集群的详细信息。下面是我用Scala编写的API。Scalaval构建器:StreamsBuilder=newStreamsBuildervaltextLines:KStream[String,String]=。builder.stream[String,String]("TextLinesTopic")接下来,我将使用构建器模式(builderpattern),从所需的主题中读取键值对的一条记录。ScalavalwordCounts:KTable[String,Long]=textLines.flatMapValues(textLine=>textLine.toLowerCase.split("\\W+"))。.groupBy((_,word)=>word).count()(Materialized.as("word-counts-table"))然后,我们将操作流中的一些函数运算符聚集到一个表中。基于Kafka的流表二元性,我们可以对KafkaSteams进行数据表级的聚合和处理转换。Scala1wordCounts.toStream.to("WordsWithCountsTopic")在转换时,我们需要将这张数据表转换成数据流,以提供其他应用程序可能感兴趣的主题。Scalavalstreams:KafkaStreams=newKafkaStreams(builder.build(),props)streams.start().sys.ShutdownHookThread{streams.close(10,TimeUnit.SECONDS)}最后,我们需要设置数据的开始和停止流,否则静态流不会主动做任何事情。KafkaSteams的优点和缺点KafkaSteams的主要优点是:Kafka集群将为您提供高速、高容错和高可扩展性。同时,Kafka还提供了exactly-once消息发送语义。这对于分布式系统来说意义重大,毕竟很多框架无法提供这样的保障,会出现数据重复或丢失的情况。同时,Kafka鼓励使用同一个消息总线来实现微服务通信,让用户有权通过Kafka来控制和建立自己的微服务间通信协议。当然,Kafka并非没有缺点。首先,Kafka强制使用Java风格的API,这让Scala程序员感到不舒服。其次,如果你想在自己的架构中使用Kafka,那么你需要搭建一个单独的Kafka集群进行管理(即使你不一定需要分配专门的主机)。同时,考虑到Kafka是高度可配置的,你需要提前知道如何配置它。最后,Kafka只支持生产者-消费者架构类型。2.AkkaStreamsAkkaStreams是一个由Scala编写并为JVM构建的高性能代码库。它实现了ReactiveStreams规范(ReactiveManifesto)——响应式、弹性、容错和消息驱动的语义。有了它,您可以处理具有无限数据量的单个记录和具有100%控制流的拓扑配置。AkkaStreams提供了Actor模型的并发性,流组件构建在异步独立组件之上。AkkaStreams的主要优点是高可扩展性和容错性。它提供了一个通用且简洁的流式API,即基于Scala的DSL。您可以简单地通过“插入”组件来启动它们。同时,AkkaStreams还提供了底层的GraphStageAPI,让你可以控制各个具体组件的逻辑。如上所述,在Kafka中,您的应用程序通过使用消息总线成为Kafka集群的客户端API。而AkkaStreams是应用程序逻辑上不可分割的一部分。你可以把AkkaStreams看作是应用程序的循环系统,而Kafka只是外部组织的“血库”。AkkaStreams的表示Scalavalsource1=Source(List("Akka","is","awesome"))valsource2=Source(List("learning","Akka","Streams"))valsink=Sink.foreach[(String,Int)](println)valgraph=GraphDSL.create(){implicitbuilder=>importGraphDSL.Implicits._?valwordCounter=Flow[String].fold[Map[String,Int]](地图()){(map,record)=>map+(record->(map.getOrElse(record,0)+1))}.flatMapConcat(m=>Source(m.toList))valmerge=builder.add(Merge[String](2))valcounter=builder.add(wordCounter)source1~>merge~>counter~>sinksource2~>merge?ClosedShape}RunnableGraph.fromGraph(graph).run()上面的代码是wordcount一个应用程序的AkkaStreams表示。Scala看起来比较简单,我们来分解一下它的主要部分代码:Scalavalsource1=Source(List("Akka","is","awesome"))valsource2=Source(List("learning","Akka","Streams"))valsink=Sink.foreach[(String,Int)](println)前3行代码构建初始数据源并发送异步元素(在本例中为字符串)。ScalavalwordCounter=Flow[String].fold[Map[String,Int]](Map()){(map,record)=>map+(record->(map.getOrElse(record,0)+1))}.flatMapConcat(m=>Source(m.toList))上面的代码是计算字数的主要部分,目的是在一个简单的字符串中生成一个列表。Scalavalmerge=builder.add(Merge[String](2))valcounter=builder.add(wordCounter)source1~>merge~>counter~>sinksource2~>merge以上代码实现了AkkaStreams自己的逻辑,使用到不同流量组。其流程逻辑图如下所示。在Stream工作流下面,我们来看这段代码:Scalasource1~>merge~>counter~>sinksource2~>merge注意上面代码中有一个非常相似的结构来表示流拓扑。只需两行代码,我们就可以轻松构建任意流式布局,而且是全异步、高速、容错的。AkkaStreams的优点和缺点由于AkkaStreams是ReactiveStreams的一个实现,它的API提供了极快的速度和高可扩展性。同时,AkkaStreams提供了底层的GraphStageAPI,让你可以控制自定义流的逻辑,比如:批量数据、手动中断、数据流的重定向等,一切皆有可能。此外,AkkaStreams还可以使用AlpakkaKafka连接器无缝连接到Kafka。AkkaStreams是作为应用程序开发库构建的,因此您不必像Kafka那样编写客户端API,而只需像其他任何库一样使用它来构建分布式应用程序。AkkaStreams的缺点是它类似于流式C++,并且学习曲线陡峭。同时,如果你使用整套集群,你会发现AkkaStreams并不容易扩展。事实上,正因为AkkaStreams成为您应用程序不可或缺的一部分,您需要像任何“构建”库一样采用某种思维方式。3.SparkStreaming是大规模Spark分布式计算引擎的自然流扩展。SparkStreaming的目的是处理连续的大规模数据。目前,您有两个API级别可供选择:具有离散流(DStreams)的低级别高度可控API和通用DataFrameAPI。也称为结构化流,它为常规“静态”大数据提供类似的API。Spark通过原生的可扩展性和容错性提供了两种输出模式和功能:微批量模式,Spark可以间隔和批量收集所有数据。连续模式,一种仍处于实验阶段的低延迟模式。Spark的主要优势体现在大数据的处理能力上。其提供的DataFrame、SQLAPI和丰富的SparkUI可以方便您监控和跟踪负载的实时性能。值得注意的是,由于Spark需要专用的计算集群,在生产环境中相对资源密集。当然,Spark是可配置的,如果您知道如何正确调整它,可以大大提高它的性能。SparkStreamingScalavalspark=SparkSession.builder().appName("Wordcount")的表达形式。.master("local[*]").getOrCreate()valstreamingDF=spark.readStream.format("kafka").option("kafka.bootstrap.servers","your-kafka-broker:9092").option("subscribe","myTopic").load()valwordCount=streamingDF.selectExpr("cast(valueasstring)asword").groupBy("word").count()wordCount.writeStream.format("控制台").outputMode("追加")。.start().awaitTermination()上面的代码是字数统计应用SparkStreaming的表示。在这里,我们使用高级结构化流API,使代码干净和分离。接下来我们进一步分析:Scalavalspark=SparkSession.builder().appName("Wordcount").master("local[*]").getOrCreate()上面的代码只需要启用一个样板——SparkSession。ScalavalstreamingDF=spark.readStream.format("kafka").option("kafka.bootstrap.servers","your-kafka-broker:9092").option("subscribe","myTopic").load()通过从上面的代码可以看出,可以通过指定数据源来读取数据。同时,SparkStreaming也可以原生支持开箱即用的Kafka。ScalavalwordCount=streamingDF.selectExpr("cast(valueasstring)asword").groupBy("word").count()上面代码的逻辑也比较简单。在SQL中,我们只需要用“groupby”来统计即可。由于Kafka以二进制形式存储数据,因此我们必须添加以下标头。ScalawordCount.writeStream.format("console")..outputMode("append").start().awaitTermination()最后,你只需要将数据流指向outputsink(这里我们又用到了Kafka)就可以开始查询数据流了。SparkStreaming的优点和缺点Spark具有基于事件时间和水印的数据后处理能力。这在实际场景中非常实用。同时,高度可配置的Spark可以通过其内置的连接器连接到Kafka作为数据输入或输出,实现性能调优。当然,Spark也有出色的文档和广泛的社区支持。此外,Spark还可以在本地进行加速,用于更小的数据处理。与其他框架一样,Spark并不完美。除了一般的DataFrame和SQLAPI之外,它在编译时会失去一些类型安全性。将Dataset导入lambda后,其性能也会下降。如上所示,SparkStreaming在大数据和微批处理方面表现不错,但其持续模式有待改进。最后,由于Spark需要运行专用的集群,所以也会占用一部分算力。四、如何选择可以看出,上面讨论的每一个框架都是为某些特定的需求而构建的。那么,我们该如何选择呢?AkkaStreams最适合高性能系统。它提供了一个非常强大的API,但是需要时间来掌握它。由于Kafka最适合作为外部高性能应用程序的消息总线,因此如果您希望微服务从公共事件中读取和写入,最好使用Kafka。当然,其Java风格的API对于干净的代码来说可能过于繁琐。SparkStreaming无疑是为大数据计算而生。但是,据记载它对真实的应用程序逻辑和低延迟需求并不友好。您只能将SparkStreaming用作数据聚合器以从数据中获取洞察力。译者介绍51CTO社区编辑JulianChen。他在实施IT项目方面拥有超过十年的经验。善于控制内外部资源和风险。他专注于传播网络和信息安全方面的知识和经验。翻译等形式分享前沿技术和新知识;经常在线上和线下开展信息安全培训和讲座。原标题:ComparingAkkaStreams,KafkaStreamsandSparkStreaming,byDanielCiocirlan
