当前位置: 首页 > 科技观察

Spark和Storm须知:ApacheApex已经诞生

时间:2023-03-22 00:33:41 科技观察

作为一种新的开源数据流分析解决方案,Apex脱胎于DataTorrent的RTS平台,可以带来出色的速度性能并简化编程需求。说起数据流分析任务,我们首先想到的就是Spark。虽然Spark在2.0版本已经集成了非结构化和结构化数据的分析能力,但是Storm的1.0版本已经解决了使用困难的问题。诞生于2015年6月的ApacheApex是凭空诞生的。它还源自DataTorrent及其令人印象深刻的RTS平台,其中包括一组核心处理引擎、仪表板、诊断和监控工具套件以及专门的数据科学家。用户图形流编程系统dtAssemble。Apex作为RTS平台的核心处理引擎,可以说是DataTorrent送给Apache的又一礼物。Apex旨在运行您现有的Hadoop生态系统,利用YARN实现按需扩展,利用HDFS实现容错。虽然它不像RTS平台那样功能齐全,但Apex足以提供人们期望从数据处理平台获得的大部分主要功能。Apex应用实例下面我们来看一组基本的Apex流程实例,其中会涉及到几个核心概念。本例中,我们将读取Kafka中的日志条目,统计日志记录类型并写入控制台。实际会列出相关的代码片段,您也可以点击此处在GitHub上获取完整的应用程序。Apex的核心概念是算子,它属于Java类,负责接收输入信息和产生输出信息。(如果你熟悉Storm,它的功能和bolt、spout基本类似。)另外,每个算子还定义了一组端口用于数据输入或输出。该方法的实际作用是从InputPort读取输入信息,或者通过OutportPort向下游发送数据。通过运算符的数据流将通过将数据流拆分为基于时间的数据窗口来建模——但与Spark的微浴不同,Apex中的输入数据处理可以在不等待窗口结束的情况下开始。DataTorrent下面的例子中,我们需要3个算子,每个算子对应Apex支持的三种算子类型中的一种:输入算子负责读取Kafka的信息条目,通用算子负责统计日志类型,输出算子然后将其写入控制台。对于第一种和第三种,我们可以直接使用Apex的Malhar库,但是对于第二种我们需要使用自定义的业务逻辑来统计查看的不同日志类型。再看看我们的LogCounterOperator代码内容:publicclassLogCounterOperatorextendsBaseOperator{privateHashMapcounter;publictransientDefaultInputPortinput=newDefaultInputPort(){@Overridepublicvoidprocess(Stringtext){Stringtype=text.substring(0,text.indexOf(''));IntegercurrentCounter=counter.getOrDefault(type,0);计数器.put(type,currentCounter+1);}};publictransientDefaultOutputPort>output=newDefaultOutputPort<>();@OverridepublicvoidendWindow(){output.emit(counter);}@Overridepublicvoidsetup(OperatorContextcontext){counter=newHashMap();}}这里我们使用了一个简单的HashMap来进行日志类型的统计,定义了2个端口来实现数据通过这个算子流式处理:一个负责输入,一个负责输出。不兼容的运算符将在输入期间导致编译时失败。需要注意的是,虽然我这里只定义了1个输入端口和1个输出端口,但是你也可以根据需要定义多个端口。通用运算符的生命周期非常简单。Apex将首先调用setup()进行任何必要的初始化;在上面的示例中,setup()负责创建HashMap。然后它调用beginWindow()来声明一个新的输入处理窗口/批次正在启动,随后是对每个数据条目的调用。如果当前窗口的剩余时间达到零,Apex调用endWindow()。我们不需要单个窗口的任何逻辑,因此将BaseOperator中的beginWindow()定义留空。然而,在每个窗口结束时,我们需要发送当前计数结果,从而通过输出端口发送HashMap。同时,重写的process()方法处理我们从日志行中提取第一个单词并更新计数器的业务逻辑。***,我们调用teardown()方法来确保Apex进程在必要时被清理——这个例子实际上不需要清理,但为了演示,我们将清理HashMap。现在我们的操作员已经创建,我们需要构建流程本身。如果你熟悉Storm的拓扑结构,你应该能够很容易地理解下面的代码:kafkaInput.setIdempotentStorageManager(torIdempotentFSIdempotentStorageManager());LogCounterOperatorlogCounter=dag.addOperator("LogCounterOperator",newLogCounterOperator());ConsoleOutputOperatorconsole=dag.addOperator("Console",newConsoleOutputOperator());dag.addStream("LogLines",kafkaInput.out,logCounter.input);dag.addStream("Console",logCounter.output,console.input);}我们首先定义DAG(即operator)节点。之后,我们定义图形边界(在Apex词汇表中称为“流”)。这些流负责将一个运算符的输出端口连接到另一个运算符的输入端口。在这里,我们将Kafka连接到LogCounterOperator,并将输出端口连接到ConsoleOutputOperator。任务完成!如果我们编译并运行应用程序,我们可以在标准输出中看到HashMap:{INFO=1}{ERROR=1,INFO=1}{ERROR=1,INFO=2}{ERROR=1,INFO=2,DEBUG=1}…Malhar:丰富的实用组件Operator最大的优点是体积小且定义明确,因此可以轻松构建和测试。它的接缝类似于乐高积木——唯一的区别是乐高积木是现成的,但运营商要求我们自己创造。Malhar就像一个巨大的乐高积木桶,里面有许多标准的2x4基本块,供每个人使用。无论是读取Splunk、合并FTP站点上的文本文件信息还是将结果存储在HBase中,Malhar都可以帮助我们实现它。有了Malhar提供的丰富的算子组件,Apex就成了最吸引人的,也就是说我们只需要设计业务逻辑即可。Malhar运算符的文档有时可能很粗糙,但库中的所有内容都配备了测试机制,因此我们可以轻松了解不同组件如何协同工作。Apex还提供了其他一些出色的设计结果。除了常见的指标和报告方案外,dtCli应用程序还允许我们在运行时动态更改提交的应用程序。你是否想添加一些负责将日志条目写入HDFS的操作员,但又不想影响应用程序的整体运行?Apex可以轻松完成这个任务。开源的数据流处理引擎已经很丰富了,但要在其中脱颖而出并不容易。凭借Malhar库提供的庞大的operator选项,以及Apex本身出色的容错、低延迟和可扩展性,Apex已经成为一个理想的框架,速度极佳,可用于生产环境。在这里,我建议DataTorrent为ApacheBeam开发一套Apexrunner,以帮助开发者更容易地从现有框架移植他们的应用程序。当然,Apex目前已经相当优秀,可以成为值得大家认真考虑的数据流处理引擎。原文链接:注意,Spark和Storm,ApacheApex来了