Netflix是如何做到每秒200万次数据交易,查询超过1.5万亿行数据的?在推动技术创新和升级的同时确保始终如一的良好Netflix体验绝非易事。如何保证更新不影响用户?如何确保我们的改进是可衡量的?Netflix使用来自播放设备的实时日志作为事件源来导出指标,以了解和量化用户设备浏览和播放的流畅度。一旦我们有了这些指标,我们就将它们输入数据库。每个指标都附有关于所用设备类型的匿名详细信息,例如它是智能电视、iPad还是安卓手机。这样,我们就可以对设备进行分类,从不同的角度查看数据。同样,我们只能隔离影响特定群体的问题,例如应用程序版本、特定类型的设备或特定国家/地区。此聚合数据可立即用于查询,无论是通过仪表板还是临时查询。这些指标还不断检查警告信号,例如新版本是否影响某些用户或设备的播放或浏览。这些检查用于通知负责团队,以便他们尽快处理问题。在软件更新期间,我们为一部分用户启用新版本,并使用这些实时指标来比较新版本与旧版本的性能。在Metrics中,如果有任何不合适的地方,我们可以中止更新并将那些获得新版本的用户恢复到以前的版本。由于此数据每秒处理超过200万次,因此很难将其存储在可以快速查询的数据库中。我们需要足够的数据维度来有效隔离问题,因此,我们每天生成超过1150亿行数据。在Netflix,我们利用ApacheDruid来帮助我们解决这种规模的挑战。1.DruidApacheDruid是一个高性能的实时分析数据库。它专为快速查询和摄取特别重要的工作流而设计。Druid特别适用于即时数据可视化、即席查询、运营分析和高并发处理。-druid.io因此,Druid非常适合我们的用例,事件数据摄取率高,并且具有高基数(highcardinality)和快速查询的要求。Druid不是关系型数据库,但有些概念是可以转移的。我们有数据源,而不是表格。与关系数据库一样,存在以列表示的数据逻辑分组。与关系数据库不同,没有连接的概念。因此,我们需要确保每个数据源中都包含所需的过滤或分组列。数据源中的列主要分为三种类型:时间、维度和度量。Druid中的一切都带有时间戳记。每个数据源都有一个时间戳列,这是主要的分区机制。维度是可用于过滤、查询或分组的值。度量是可以聚合的值,并且几乎总是数字。通过去掉执行join的能力,并且假设数据都是有时间戳的,Druid可以在存储、分发和查询数据方面做一些优化,这样我们就可以将数据源扩展到万亿行,并且仍然可以在查询响应时间以内10毫秒。为了实现这种级别的可扩展性,Druid将存储的数据划分为时间块。时间块的长度是可配置的。可以根据数据和用例选择适当的间隔。对于数据和用例,我们使用1小时的时间块。时间块中的数据存储在一个或多个段中。每个段包含属于该时间块的所有数据行,由其时间戳列决定。段大小可以配置为最大行数或段文件的总大小。在查询数据时,Druid将查询发送到集群中所有具有时间块在查询范围内的段的节点。在将中间结果发送回查询代理节点之前,每个节点都针对它所持有的数据并行处理查询。代理将在将结果集发送回客户端之前执行最终的合并和聚合。2.数据插入这个数据库是实时的,而不是将单个记录插入数据源,事件是从Kafka流中读取的(这是我们的指标)。每个数据源使用一个主题。在Druid中,我们使用Kafka索引任务,它创建多个分布在活动节点(中间管理器)上的索引工作者。这些索引器都订阅主题并从流中读取它们的事件。索引器根据摄取规范从事件消息中提取值,并将创建的行累积到内存中。一旦创建了一行,就可以对其进行查询。索引器正在填充的段的时间块的查询将由索引器本身提供服务。由于索引任务本质上执行两项工作,即摄取和处理查询,因此及时将数据发送到历史节点以便以更优化的方式将查询工作卸载给它们非常重要。Druid可以在摄取数据时聚合数据,以最大限度地减少需要存储的原始数据量。Rollup是汇总或预聚合的一种形式。在某些情况下,汇总数据可以大大减少需要存储的数据的大小,行数可能减少几个数量级。然而,这种存储减少是有代价的:我们失去了查询单个事件的能力,只能以预定义的查询粒度进行查询。对于我们的用例,我们选择了1分钟的查询粒度。在摄取期间,如果任何行具有相同的维度并且它们的时间戳在同一分钟内(我们的查询粒度),它们就会被聚合。这意味着通过将所有指标值相加到合并的行中并递增计数器,我们知道有多少事件促成了该行的值。这种形式的Rollup可以显着减少数据库中的行数,从而加快查询速度。一旦累积的行数达到某个阈值,或者段打开时间过长,这些行就会写入段文件并卸载到深度存储中。然后索引器通知协调器片段已准备好,以便协调器可以告诉一个或多个历史节点加载它。一旦一个段成功加载到历史节点中,它就会从索引器中卸载,并且针对该数据的任何查询现在都将由历史节点提供服务。3.数据管理可以想象,随着维度基数的增加,同一事件在同一分钟内发生的概率会降低。管理基数(用于聚合)是实现良好查询性能的有力手段。为了达到我们需要的摄取速度,可以运行许多索引器实例。即使索引任务使用Rollup合并相同的行,在索引任务的相同实例中获得这些相同行的机会也非常低。为了解决这个问题并实现最好的Rollup,我们在给定时间块的所有段都已传送到历史节点后运行一个任务。计划的压缩任务从深度存储中获取时间块的所有片段,并运行map/reduce作业以重新创建片段并实现完美的汇总。然后由历史节点加载和发布新的段,替换和取代原始的、汇总不足的段。在我们的例子中,通过使用这个额外的压缩任务,行数减少到1/2。知道何时接收到给定时间块的所有事件并非易事。Kafka上的数据可能会延迟到达,或者索引器可能需要一些时间才能将片段传递到历史节点。为了解决这个问题,我们在运行压缩之前执行一些限制和检查。首先,我们丢弃任何迟到的数据。我们认为这些数据在我们的实时系统中已经过时。这设定了数据延迟的界限。其次,压缩任务是延迟调度的,这允许有足够的时间将段卸载到正常流程中的历史节点。最后,当给定时间块的计划压缩任务开始时,它会查询段元数据以检查是否仍有相关段写入或交付。如果是这样,它将等待几分钟,然后重试。这将确保所有数据都由压缩作业处理。如果没有这些措施,我们发现数据有时会丢失。压缩开始时仍在写入的段将被新压缩的段覆盖,这些段具有更高的版本,因此优先。这有效地删除了那些尚未完成传输的段中包含的数据。4.QueryDruid支持两种查询语言:DruidSQL和nativequery。在底层,DruidSQL查询被转换为本地查询。本机查询以JSON格式提交给REST端点,这是我们使用的主要机制。大多数对我们集群的查询都是由自定义内部工具生成的,例如仪表板和预警系统。这些系统最初设计用于与我们内部开发的开源时间序列数据库Atlas配合使用。因此,这些工具使用AtlasStack查询语言。为了加速Druid对查询的采用并实现现有工具的重用,我们添加了一个转换层来接收Atlas查询,将它们重写为Druid查询,发送查询并将结果重新格式化为Atlas结果。这个抽象层允许现有工具按原样使用,用户无需额外学习即可访问我们Druid数据存储中的数据。5.调整在调整集群节点的配置时,我们会高速运行一系列可重复和可预测的查询,以获得每个给定配置的响应时间和查询吞吐量的基线。这些查询旨在隔离集群的各个部分,以检查查询性能的改进或回归。比如我们针对最近的数据做针对性的查询,只查询MiddleManager。同样,对于较长时间段但较旧的数据,我们仅查询历史节点以测试缓存配置。此外,使用按高基数维度分组的查询检查结果合并是如何受到影响的。我们将继续调整和运行这些基准测试,直到我们对查询性能感到满意为止。在这些测试中,我们发现调整缓冲区的大小、线程数、查询队列的长度以及分配给查询缓存的内存对查询性能有真正的影响。然而,压缩作业的引入对查询性能有更显着的影响,重新压缩汇总不足的段以实现完美汇总。我们还发现,在历史节点上启用缓存非常有益,而在代理节点上启用缓存则没有那么明显。因此,我们不在代理上使用缓存。这可能是由我们的用例引起的,但几乎每个查询都错过了代理上的缓存,可能是因为查询通常包含最新数据,而这些数据不在任何缓存中,因为数据不断到达。6.总结对于我们的用例和数据速率,经过多次优化和调整,Druid已经证明具有我们最初希望的能力。我们已经能够获得一个功能齐全且可用的系统,但还有更多工作要做。随着查询数量和复杂性的增加,我们的摄取量和速度也在不断增加。随着越来越多的团队意识到这些详细数据的价值,我们通常需要添加更多的度量和维度,这会给系统带来负担。我们必须继续监视和调整以控制查询性能。我们目前正在以每秒200万次的速度处理事件,查询超过1.5万亿行数据以获取有关用户体验服务的详细信息。所有这些都有助于我们保持高质量的Netflix体验,同时能够继续创新。
