在过去的一年里,我与许多软件公司会面,讨论如何处理他们的应用程序数据(通常以日志和指标的形式)。在这些讨论中,我经常听到不得不使用零碎的工具集随着时间的推移将这些数据整合在一起的挫败感。这些工具,例如:-运维人员用于监控和告警的工具-开发人员用来跟踪性能和定位问题的工具-一个完整的独立系统,商业智能(BI)和业务依赖它来分析用户行为.尽管这些工具使用不同的视角,适用于不同的场景,但它们都侧重于数据来源和类型。所以很多软件团队说,“如果我们有时间,我们可以构建一个更好的”,坦率地说,那里有很多很棒的开源代码,自己构建一个是否更有意义还有待商榷。在Jut,我们就是这样做的。我们使用开源大数据组件构建了一个流数据分析系统,本文描述了我们使用的各个部分以及我们如何将它们组合在一起。我们将介绍:-Dataingestion:如何引入不同类型的数据流-Indexingandsavingdata:高效存储和统一查询-Concatenation:数据在系统中流动的过程-Tuning:让整个过程真正快速,让用户真的会用我希望阅读本文能帮助您的系统以理智、可扩展的方式避免我们遇到的一些陷阱。1DataIngestion在进行业务分析和监控时,大部分相关的数据类型、格式和传输协议都是不固定的。您需要能够支持系统的不同数据源和数据发送方。例如,您的数据可能包括以下任何一项:-自定义应用程序事件。-容器级指标和日志。-statsd或收集的指标。-来自GitHub或Stripe等第三方的Webhook事件。-应用程序或服务器日志。-用户行为。尽管这些都有不同的格式和符号,但它们都需要系统内的统一格式。无论选择哪种格式,都需要转换传入的数据流。我们选择了一种简单灵活的数据格式:每条记录(“点”)是一系列键/值对,可以方便地表示为JSON对象。所有点都有一个“时间”字段,测量点也有一个数值“值”字段;其他点可以有任何“形状”。前端HTTPS服务器(运行Nginx)接收数据,进行多路分解并将其发送到本地的每个数据类型“连接器”进程(运行Node.js)。这些过程将传入数据转换为系统的内部格式,然后将它们发布到Kafka主题(可靠性),从那里它们可用于索引和/或处理。除了上述数??据类型之外,考虑使用连接器使您自己的团队能够最轻松地将输入数据集成到您的数据总线中。您可能不需要我在这里描述的太多通用性或灵活性,但在设计中具有一定的灵活性总是好的,这使您的系统能够摄取更多的数据类型,并防止您在以后有新数据时不得不重建.2索引和存储数据所有这些数据都需要存储在某个地方。***在数据库中,当您的数据需要增长时,它很容易扩展。如果数据库为分析类型提供查询模式支持,那就太好了。如果这个数据中心只是用来存储日志和事件,那你可以选择Elasticsearch。如果只是做指标,可以选择时序数据库(TSDB)。但我们都需要处理它。我们最终构建了一个具有多个本地数据存储的系统,以便我们能够最有效地处理不同类型的数据。ElasticSearch保存日志和事件我们使用Elasticsearch作为事件数据库。这些事件可以具有不同的“形状”,具体取决于它们来自哪个来源。我们使用了一些ElasticsearchAPI,效果很好,尤其是查询和聚合API。Cassandra和ElasticSearch存储指标,而指标原则上完全存储在Elasticsearch(或任何其他数据库)中,使用专用的匹配指标数据结构以及指标冗余数据数据库会更高效。最好的方法是使用现有的开源时间序列数据库(TSDB)。这就是我们最初使用它的方式——我使用开源TSDB并使用Cassandra作为后端。这种方法的挑战在于TSDB有自己的查询API,这与Elasticsearch的API不同。由于API之间的差异,很难为事件和指标提供统一的搜索和查询接口。这就是我们最终决定编写自己的TSDB以通过Casandra和Elasticsearch存储指标的原因。具体来说,我们在Cassandra中存储时间/值键值对,在Elasticsearch中存储元数据,并在顶部有一个查询和管理层。这样就可以在Elasticsearch中统一搜索查询事件和指标。流处理引擎现在我们有一个用于摄取数据和一些数据库的管道。我们准备好添加前端应用程序并使用我们的数据了吗?不!虽然Elasticsearch本身可以做一些日志和事件分析,但是我们还是需要一个处理引擎。因为:-我们需要一种统一的方式来访问事件和指标,包括实时或历史数据。-对于某些情况(监控,报警),当它发生时,我们需要实时处理这些数据。-指标!我们想做的不仅仅是找到指标并读出它们——指标是为了优化现有指标。-即使对于事件,我们也需要比ElasticsearchAPI更通用的处理能力。比如加入不同的源和数据,或者做字符串解析,或者自定义聚合。从这里开始,事情变得非常有趣。您可以花一天(或更多)时间研究其他人如何构建数据管道,并了解Lambda、Kappa等数据架构。实际上那里有很多非常好的材料。开门见山:我们实现的是一个支持实时数据流和批计算的处理引擎。对此,我们全力支持,有兴趣的可以看看这里和这里。在这里,与存储和摄取不同,我们从头开始构建自己的处理引擎-不是因为没有其他流处理引擎,而是因为我们重视查询性能,我们将在以下部分单独讨论。更具体地说,我们构建了一个实现数据流处理模型的流处理引擎,其中计算表示表示为将输入转换为输出的操作的有向图,例如聚合、窗口化、过滤或连接。这样可以很自然地将模型查询和计算结合起来,适合实时和批量,适合分布式运行。当然,除非你真的想建立一个新项目,否则我们建议你使用开源流处理引擎。我们建议您查看Riemann、SparkStreaming或ApacheFlink。3.查询和??计算我们使用流处理引擎,基于数据流模型进行计算。但是用户如何表达查询并创建这样的数据流图呢?一种方法是提供API或嵌入式DSL。该接口将需要提供一种查询和过滤数据、定义转换和其他处理操作的方法,最重要的是,提供一种将多个处理阶段组合和应用到流程图的方法。上述每个项目都有自己的API,虽然个人偏好可能有所不同,但API的一个共同挑战是SQL分析师或Excel用户不容易使用它们。此时,该问题的一个可能解决方案是允许这些用户通过构建在这些API之上的工具(例如一个简单的Web应用程序)访问系统。另一种方法是提供一种简单的查询语言。这就是我们在Jut所做的。因为目前没有现成的数据流查询语言(比如用于关系查询的SQL),我们创建了一种数据流查询语言,叫做Juttle。Juttle的流图查询语言的核心是可以用简单的语法声明处理管道,如上图所示。它具有这些原语、搜索、窗口、连接、聚合和分组,语法简单。当然,在您可以在流程图中处理数据之前,您需要获取数据-Juttle允许您定义查询以通过事件和/或指标的任意组合获取数据,实时的和/或历史的,所有这些都具有相同的语法和结构。这是一个简单的例子,遵循一个模式...query|分析|查看(请注意,链接使用管道运算符,类似于shell的语法)。```从:1天前读取:datatype='weblog'|减少-every:minute:count()bystatus_code|@timechart```4件放在一起:异常检测的示例到目前为止,我们采用了以组件为中心的观点-我们已经讨论了组件及其作用,但没有太多讨论它们如何组合在一起。现在我们切换到以数据为中心的视角,看看支持实时和历史查询需要哪些步骤。让我们用一个异常检测算法的例子来说明。这是一个很好的例子,因为我们需要查询历史数据来训练底层统计模型,实时流数据来测试异常,然后我们需要将结果写回系统并对异常发出警报。然而,在我们进行任何查询之前,我们需要序列化整个摄取过程,即传入数据如何写入索引存储。这是由导入服务完成的,它包括写入时间序列数据库、在Elasticsearch和Cassandra中存储指标和元数据。现在一个用户进来并开始异常检测工作。这就需要读取历史数据,通过任务处理引擎直接查询底层数据库。可以针对性能进一步优化不同的查询和数据(下面讨论),和/或实现到指标数据库的读取路径(在Elasticsearch中查询元数据,在Cassandra中获取指标,并组合结果以产生实际的指标点)。历史数据覆盖了过去范围内的部分数据,处理引擎将历史数据转化为流图的实时数据。为此,处理引擎将数据直接导入到导入服务的入口点。请注意,必须小心进行此切换,以免丢失或复制数据。在这一点上,我们有一个训练有素的流程图,用于在实时数据上运行异常检测。当检测到异常时,我们希望它向某个外部系统发送警报,该系统可以通过处理引擎将数据发布到外部HTTP服务。除了发送警报外,我们还希望跟踪我们的内部系统。换句话说,我们希望能够将数据流回系统。从概念上讲,这是通过处理引擎管道将数据返回到摄取管道。5调整所以我们有一个用于摄取数据和一些数据库和处理引擎的工作系统。我们准备好添加前端应用程序并分析我们的数据了吗?还没有!好吧,我们实际上可以这样做,但问题是我们的查询性能仍然会很慢。缓慢的查询意味着……没有人在使用我们的系统。那么让我们重新审视“统一处理引擎”的概念。按照我们的理解,它是同一个系统,使用相同的结构、抽象和查询来处理历史或实时数据。性能挑战来自这样一个事实,即历史数据比实时数据多得多。例如,假设我们有一个百万点/秒的输入到系统中,并且有一个足够快的过程可以在输入数据时实时查询。现在采用相同的查询语义来查询过去一天的数据。这将需要一次处理数百亿个点(或者,至少必须能够跟上从存储点读取的速度)。假设计算是分布式的,我们可以通过增加计算节点来解决,但在最好的情况下,这将是低效和昂贵的。所以这就是优化的用武之地。有很多方法可以优化数据查询。其中一些包括转换查询本身。例如,上游数据的过滤器或聚合应尽可能不改变查询语义。我们说的那种优化,就是尽可能让数据库过滤和处理数据。这需要执行以下操作:-自动识别查询中可以由数据库处理的部分-将相应部分转换为目标数据库的查询语言-运行后端查询并将结果注入数据流中的正确位置图6结论我们做到了!当然,如果不需要可视化层,我们就完成了。系统只能通过API查询。构建客户端应用程序以创建查询、流式传输和可视化数据、编写仪表板是另一个棘手的问题,因此我们将在另一天讨论。现在,让我们总结一下我们在构建这个数据中心的过程中的所见所闻:-一个摄取管道,它可以接受来自不同来源的输入数据,将其转换为统一的格式,并存储以备后用。(在Jut中,这是建立在Kafka之上的)。-事件和指标数据库。Jut中Events使用Elasticsearch,自建的metrics数据库基于Cassandra。-一个处理引擎(或两个,如果你要使用lambda-ish架构)。-在系统上运行查询的API或查询语言。哟。构建这个系统是一段漫长而有趣的旅程。即使您想构建自己的系统,也请先尝试Jut。您可能会发现它很有用。
