当前位置: 首页 > 科技观察

如何设计流式计算基准?

时间:2023-03-19 13:51:38 科技观察

如何选择适合自己业务的流计算引擎?除了比较它们各自的功能矩阵外,基准测试是用于评估系统性能的重要且常用的方法。但是,在流计算领域,目前还没有一个行业标准的基准。本文将讨论流计算基准设计的难点,分享如何设计一个流计算基准框架——Nexmark,以及未来的计划。一、背景随着数据的时效性对企业精细化运营越来越重要,“实时就是未来”、“实时数仓”、“数据湖”成为近年来的热词.流计算的格局在过去几年也发生了翻天覆地的变化。ApacheFlink一直在流批一体的方向上不断深耕。ApacheSpark的近实时处理有一定的受众。ApacheKafka还有ksqlDB高调进军流计算。但是ApacheStorm已经逐渐退出了历史舞台。各个引擎各有千秋,如何选择适合自己业务的流计算引擎成为了一个由来已久的话题。除了比较各个引擎提供的不同功能矩阵外,性能是一个绕不过去的评价因素。基准测试是用于评估系统性能的重要且常见的过程。2、现有流计算基准测试存在的问题目前,在流计算领域,还没有行业标准的基准测试。目前业界知名的流计算基准测试是YahooStorm团队五年前发布的YahooStreamingBenchmarks[4]。雅虎的初衷是因为业界缺乏反映真实场景的基准,模拟了一个简单的广告场景来比较各种流计算框架,后来被广泛引用。具体场景是将Redis中广告的活动信息与Kafka消费的广告点击流关联起来,然后做时间窗聚合统计。但也正是因为雅虎团队过于追求还原真实的生产环境,导致这些外部系统服务(Kafka、Redis)成为了运行的瓶颈。Ververica在这篇文章[5]中做了扩展实验,将来自Kafka的数据源替换为内置的datagen源,性能提升了37倍!可以看出,引入的Kafka组件导致无法准确反映引擎真实性能。一个更重要的问题是,YahooBenchmark只包括像“WordCount”这样一个非常简单的工作,它不能完全反映当今复杂的流计算系统和业务。试想一下,谁会用一个简单的“WordCount”来衡量和比较各种数据库之间的性能差异呢?正是这些原因阻碍了YahooBenchmark成为行业标准基准。这正是我们要解决的问题。因此,我们认为一个行业标准的基准测试应该具备以下特点:再现性再现性是基准可信赖的重要条件。许多基准测试结果难以重现。有些是因为只显示了基准测试结果图,并且没有公开用于生成这些结果的代码。有些是因为用于基准测试的硬件不容易被其他人使用。在某些情况下,测试结果不稳定,因为基准测试依赖于太多服务。能够代表和覆盖行业真实的业务场景(查询量)。例如,数据库领域著名的TPC-H和TPC-DS覆盖了大量的查询集,以捕捉查询引擎之间的细微差别。而且这些查询集都是基于真实的业务场景(商品零售行业),数据规模大,因此也受到一些大数据系统的青睐。可以调整作业负载(数据量、数据分布)在大数据领域,不同的数据规模对于引擎来说可能是完全不同的东西。例如,YahooBenchmark中只使用了100个campaignid,这使得状态非常小,可以存储在内存中。这使得同步IO和检查点等的影响可以忽略不计。然而,真实的场景往往要面对大局,面临的挑战要复杂和艰巨得多。TPC-DS等数据生成工具将提供标量因子参数来控制数据量。其次,在数据分布方面,最好接近真实世界的数据。如果存在数据倾斜,请调整倾斜比例。这样,业务场景和引擎的差异就可以全面、全面地体现出来。有统一的性能衡量指标,采集汇总工具benchmark的性能指标定义需要清晰、一致,适用于各种计算引擎。但是,流计算的性能指标比传统的批处理更难定义和收集。是流计算benchmark中最具挑战性的问题之一,下面也会介绍。我们也研究过很多其他流计算相关的基准测试,包括:StreamBench、HiBench、BigDataBench,但它们都缺乏上述基础。基准测试的行业标杆无疑是TPC发布的一系列基准,例如TPC-H和TPC-DS。然而,这些基准测试是为传统数据库和数据仓库设计的,并不适用于今天的流计算系统。例如,该基准测试没有考虑事件时间、数据乱序和窗口等流计算中的常见场景。因此,我们不得不考虑重新设计并开源一个流式计算基准测试框架——Nexmark。地址:https://github.com/nexmark/nexmark。三种Nexmark基准测试框架的设计为了提供满足上述基本原理的流计算基准测试,我们设计开发了Nexmark基准测试框架,力求使其成为流计算领域的标准基准测试。Nexmark基准框架源自NEXMark研究论文[1]和ApacheBeamNexmarkSuite[6],并在此基础上进行了扩展和改进。Nexmark基准测试框架不依赖于任何第三方服务。你只需要部署引擎和Nexmark,你可以等待并通过脚本nexmark/bin/run_query.shall获得所有查询下的基准测试结果。下面我们将探讨Nexmark基准测试的一些设计决策。1去除外部source和sink依赖上面提到,YahooBenchmark使用的是Kafka数据源,但是最终的结果并不能准确反映引擎的真实性能。另外我们还发现,在对快慢流双流JOIN场景进行benchmark时,如果使用Kafka数据源,慢流会被提前消费(快流容易被背压),导致JOIN节点的状态缓存了大量的高级数据。这其实不能反映真实场景,因为在真实场景中,慢流是无法提前消费的(数据还没有产生)。所以我们在Nexmark中使用了datagen源,数据直接在内存中生成,并不直接将数据发送到下游节点。多个事件流由单个数据生成器生成,因此在对快流进行反压时,也可以抑制慢流的生成,更能反映真实场景。同样,我们也去掉了外部sink的依赖,不再输出到Kafka/Redis,而是输出到一个空的sink,即sink会丢弃所有接收到的数据。通过这种方式,我们确保瓶颈仅在引擎本身,从而可以准确测量引擎之间的细微差异。2Metrics批处理系统benchmark的metric通常用总耗时来衡量。但是流计算系统处理的数据是连续的,查询耗时无法统计。因此,我们提出了三个主要指标:吞吐量、延迟和CPU。Nexmark测试框架会自动为我们收集和汇总指标,无需部署任何第三方指标服务。吞吐量(throughput)也常被称为TPS,描述了一个流计算系统每秒可以处理多少条数据。由于我们有多个事件流,所有的事件流都是由一个数据生成器生成的,为了统一观察角度,我们使用数据生成器的TPS,而不是单个事件流的TPS。我们将查询可以达到的最大吞吐量作为其吞吐量指标。例如,对于Flink引擎,我们通过FlinkRESTAPI暴露的.numRecordsOutPerSecond指标获取当前吞吐量。延迟描述了从数据进入流式计算系统到其结果输出的时间间隔。对于windowaggregation,YahooBenchmark使用output_system_time-window_end作为延迟指标,实际上并没有考虑window输出前数据的等待时间,计算结果会受到backpressure很大的影响,所以计算结果是不准确的。更准确的计算是output_system_time-max(ingest_time)。但是在非windowaggregation或者two-streamJOIN中,延迟的计算方式会有所不同。因此,流计算系统中延迟的定义和收集存在很多现实问题,需要根据具体查询进行分析。参考文献[2]对此进行了详细讨论。这就是为什么我们还没有在Nexmark中实施延迟指标的原因。CPU资源使用率是许多流计算基准测试中被忽略的指标。因为在真实的生产环境中,我们并没有限制流计算引擎可以使用的核数,从而赋予系统更大的灵活性。所以我们引入了CPU使用率作为辅助指标,即这个作业一共消耗了多少核。通过throughput/cores,可以计算出每个core对吞吐量的平均贡献。对于进程CPU使用率的收集,我们没有使用JVMCPU负载,而是借鉴了YARN中的实现,通过采样/proc//stat并计算得到。该方法可以获得更真实的进程CPU使用率。因此,我们的Nexmark测试框架需要在测试开始前在每台机器上部署一个CPU获取流程。3QueryandSchemaNexmark的商业模式是基于一个真实的在线拍卖系统。所有的查询都基于相同的三个数据流,这三个数据流将由一个数据生成器生成,以控制它们的比例、数据倾斜、关联关系等。这三个数据流是:用户(Person):代表一个用户谁提交拍卖或参与投标。Auction:代表拍卖物品。出价(Bid):表示对拍卖品的出价。我们总共定义了16个查询,所有查询都使用ANSISQL标准语法。基于SQL,我们可以更方便地扩展查询测试集,支持更多的引擎。但是由于Spark的流计算功能的限制,大部分的查询都无法通过StructuredStreaming来实现。因此,我们目前只支持测试FlinkSQL引擎。4作业负载的配置我们还支持作业负载的配置和调整,包括数据生成器的吞吐量和吞吐量曲线,各数据流之间的数据量比例,各数据流的平均数据大小,数据倾斜度比例等。具体请参考SourceDDL参数。4.实验结果我们在阿里云的三台机器上对Flink进行了Nexmark基准测试。每台机器都是ecs.i2g.2xlarge,配备Xeon2.5GHzCPU(8个vCore),32GBRAM,800GBSSD本地磁盘。机器之间的带宽为2Gbps。在测试了flink-1.11版本后,我们在这3台机器上部署了一个Flink独立集群,由1个JobManager和8个TaskManager(每个只有1个槽)组成,均有4GB内存。集群默认并行度为8。启用检查点和恰好一次模式,检查点间隔为3分钟。使用RocksDB状态后端。测试发现对于有状态的查询,每个checkpoint的大小都在GB级别以上,所以测试大状态的场景是有效的。Datagen源以每秒1000万条的速度不断产生数据,三个数据流的数据比例为Bid:92%,Auction:6%,Person:2%。每个查询运行3分钟预热,然后收集性能指标3分钟。全部运行nexmark/bin/run_query.sh后,打印出来的测试结果如下:五、总结我们开发设计Nexmark的初衷是推出一套标准的流计算基准测试集和测试程序。虽然目前只支持Flink引擎,但在目前也有一定的意义,比如:推动流计算基准的发展和标准化。作为Flink引擎版本迭代之间的性能测试工具,甚至日常回归工具,可以及时发现性能回归问题。在开发Flink性能优化函数时,可以用来验证性能优化的效果。一些公司可能有内部版本的Flink,可以作为内部版本和开源版本的性能对比工具。当然,我们也计划继续完善和完善Nexmark测试框架,比如支持延迟指标,支持更多的引擎,比如SparkStructuredStreaming、SparkStreaming、ksqlDB、FlinkDataStream等,也欢迎有志之士加入贡献和扩张。参考及引用[1]PeteTucker和KristinTufte。“NEXMark–数据流查询基准”。2010年6月。[2]JeyhunKarimov和TilmannRabl。“对分布式流数据处理系统进行基准测试”。arXiv:1802.08496v2[cs.DB]2019年6月[3]YangjunWang。“流处理系统基准测试:StreamBench”。2016年5月。[4]https://github.com/yahoo/streaming-benchmarks[5]https://www.ververica.com/blog/extending-the-yahoo-streaming-benchmark[6]https://beam.apache.org/documentation/sdks/java/testing/nexmark/