转载本文请联系3分钟看懂大数据公众号。前言大家好,我是涂哥。最近在公司做Flink推理任务的性能测试,需要对作业的全链路吞吐量、全链路时延、吞吐延迟指标进行监控调优,其中使用了FlinkMetrics进行指标监控。下篇干货满满。将带领读者全面了解FlinkMetrics指标监控,并通过实战案例,优化全链路吞吐量、全链路时延、吞吐时延等指标性能,彻底掌握FlinkMetrics性能调优方法和使用指标。大纲目录如下:1FlinkMetrics简介FlinkMetrics是Flink集群运行中的各项指标,包括机器系统指标,如:CPU、内存、线程、JVM、网络、IO、GC和任务运行等组件(JM、TM、Slot、job、operator)等相关指标。FlinkMetrics有两个功能:实时收集监控数据。在FlinkUI界面上,用户可以看到自己提交的任务状态、延时、监控信息等。提供对外数据采集接口。用户可以主动将整个Flink集群的监控数据上报给第三方监控系统,比如prometheus、grafana等,下面会介绍。1.1FlinkMetric类型Flink一共提供四种监控指标:Counter、Gauge、Histogram、Meter。1.Count计数器统计一个metric的总量。写过MapReduce的开发者应该对Counter不陌生。其实意思是一样的,就是累加一个计数器,也就是多条数据,多兆数据不断累加的过程。其中,Flink算子的接收记录总数(numRecordsIn)和发送记录总数(numRecordsOut)属于Counter类型。使用方法:调用counter(Stringname)创建并注册MetricGroup2。Gauge指标的瞬时值Gauge是最简单的Metrics,它反映了一个指标的瞬时值。比如查看TaskManager的JVM堆内存当前使用了多少,可以每次实时暴露一个Gauge,Gauge的当前值就是堆使用的量。使用前,先创建一个实现org.apache.flink.metrics.Gauge接口的类。对返回值的类型没有限制。您可以通过调用MetricGroup上的gauge来完成此操作。3、仪表平均值用于记录某指标在一定时间内的平均值。Flink中的指标包括Task算子中的numRecordsInPerSecond,记录了这个Task或算子每秒接收到的记录数。使用方法:通过markEvent()方法注册事件的发生。通过markEvent(longn)方法注册多个同时发生的事件。4.HistogramHistogram是用来统计一些数据的分布情况的,比如Quantile,Mean,StdDev,Max,Min等,其中最重要的就是统计算子的延迟。该指标会记录数据处理的延迟信息,对任务监控有重要作用。使用方法:调用histogram(Stringname,Histogramhistogram)注册一个MetricGroup。1.2ScopeFlink的索引体系分为树状结构,domain相当于树上的顶点分支,表示大索引的分类。每个指标都分配了一个标识符,该标识符将根据3个组成部分进行报告:用户在注册指标时提供的名称;可选的用户定义域;系统提供的域。例如,如果A.B是系统域,C.D是用户域,E是名称,则指标的标识符将为A.B.C.D.E。您可以通过在conf/flink-conf.yam字符(默认“.”)中设置metrics.scope.delimiter参数来配置标识符的分隔符。例如:以算子的指标组结构为例,默认为:.taskmanager....算子的输入记录数指标为:hlinkui.taskmanager.1234.wordcount.flatmap.0.numRecordsIn1.3Metrics运行机制在生产环境中,为了保证对Flink集群和作业运行状态的监控,Flink提供了两种集成方式:1.3.1主动方式MetricReportFlinkMetrics通过在conf/flink-conf中配置一个或多个reporter。yaml,指标暴露给外部系统。这些记者将在每个工作和任务管理器启动时实例化。1.3.2被动模式RestAPI通过提供Rest接口被动接收外部系统调用,可以返回集群、组件、作业、任务、算子的状态。RestAPI实现类是WebMonitorEndpoint2。FlinkMetrics监控系统在构建Flink的主动方式中一共提供了8种报告。我们使用PrometheusPushGatewayReporter通过prometheus+pushgateway+grafana组件搭建FlinkOnYarn可视化监控。当用户使用Flink以session方式向yarn集群提交作业时,Flink会通过PrometheusPushGatewayReporter将metrics推送到pushgateway的9091端口,然后使用外部系统prometheus从pushgateway进行pull操作,收集metrics,并通过Grafana可视化工具将它们展示出来。示意图如下:首先,我们在FlinkOnYarn集群中提交一个作业任务,让它运行起来,然后执行以下操作。2.1配置ReporterReporter下的所有工具和jar包都已经下载好了,需要的朋友可以在公众号后台回复:02,都可以获取。2.1.1导入包将flink-metrics-prometheus_2.11-1.13.2.jar包导入到flink-1.13.2/bin目录下。2.1.2配置Reporter选择PrometheusPushGatewayReporter方法。在官网查询到Flink1.13.2Metrics的配置后,在flink-conf.yaml中设置。配置如下:metrics.reporter.promgateway.class:org.apache.flink.metrics.prometheus.PrometheusPushGatewayReportermetrics.reporter.promgateway.host:192.168.244.129metrics.reporter.promgateway.port:9091metrics.reporter.promgateway.jobName:myJobmetrics.reporter.promgateway.randomJobNameSuffix:truefmetrics.reportelegateway.promgate.reporter.promgateway.groupingKey:k1=v1;k2=v2metrics.reporter.promgateway.interval:60SECONDS2.2部署pushgatewayPushgateway是独立的服务。Pushgateway位于应用发送指标和Prometheus服务器之间。Pushgateway接收指标,然后由Prometheus服务器将其作为目标拉取。将其视为代理服务,或者与黑盒导出器的行为相反,接收指标而不是探测它们。2.2.1解压pushgateway2.2.2。启动pushgateway,进入pushgateway-1.4.1目录./pushgateway&后台查看是否启动成功psaux|greppushgateway2.2.3。登录pushgatewaywebui2.3部署prometheusPrometheus(普罗米修斯)是一个初步搭建在SoundCloud上的监控系统。它自2012年以来一直是社区开源项目,拥有非常活跃的开发者和用户社区。为强调开源和独立维护,Prometheus于2016年加入云原生云计算基金会(CNCF),成为继Kubernetes之后的第二个托管项目。2.3.1解压prometheus-2.30.02.3.2写入配置文件scrape_configs:-job_name:'prometheus'static_configs:-targets:['192.168.244.129:9090']labels:instance:'prometheus'-job_name:'linux'static_configs:-targets:['192.168.244.129:9100']labels:instance:'localhost'-job_name:'pushgateway'static_configs:-targets:['192.168.244.129:9091']labels:instance:'pushgateway'2.3.3启动prometheus./prometheus--config.file=prometheus.yml&启动后可以通过ps查看端口:psaux|grepprometheus2.3.4登录prometheuswebui2.4并部署grafanaGrafana是一个跨平台的开源测量分析和可视化工具。对采集到的数据进行可视化查询和展示,并及时通知。主要有以下六大特点:展示方式:快速灵活的客户端图表,面板插件有多种不同的可视化指标和日志方式,官方库有丰富的仪表盘插件,如热图、折线图、图表等多种显示方式;数据源:Graphite、InfluxDB、OpenTSDB、Prometheus、Elasticsearch、CloudWatch、KairosDB等;通知提醒:可视化定义最重要指标的报警规则,当数据达到阈值时,Grafana会不断计算并发送通知,通过Slack、PagerDuty等方式得到通知;MixedPresentation:在同一个图表中混合不同的数据源,您可以在每个查询的基础上指定数据源,甚至可以自定义数据源;注释:使用来自不同数据源的丰富事件对图表进行注释,将鼠标悬停在事件上会显示完整的事件元数据和标签;过滤器:临时过滤器允许动态创建新的键/值过滤器,这些过滤器会自动应用于使用该数据源的所有查询。2.4.1解压grafana-8.1.52.4.2启动grafana-8.1.5./bin/grafana-serverweb&2.4.3登录grafana登录用户名和密码均为admingrafana配置中文教程:https://grafana.com/docs/grafana/latest/datasources/prometheus/2.4.4配置数据源,创建系统负载监控要访问Prometheus设置,将鼠标悬停在配置(齿轮)图标上,然后点击Datasources,然后点击Prometheusdatasources,根据操作如下图。操作完成后,单击验证。2.4.5添加Dashboard点击最左边的+号,选择DashBoard,选择新建面板。至此,Flink的指标都在Grafana中展现。flink指标对应的指标名称比较长。您可以在Legend中配置显示内容,将{{key}}中的key替换为对应的要显示的字段,如:{{job_name}},{{operator_name}}。3指标性能测试上述监控系统搭建完成后,我们就可以对性能指标进行监控了。现在介绍一个实际案例:3.1业务场景介绍金融风控场景3.1.1业务需求:FlinkSource从数据kafka主题中读取推理数据,通过SQL预处理成模型推理需要的数据格式,进行keyBy分组,然后流转进入下游connect算子,与模型连接,然后进入Co-FlatMap算子进行推理。示意图如下:3.1.2业务需求:根据模型的复杂程度,要求推理时延在20ms以内,整个链路耗时50ms以内,吞吐量达到1.2w/秒以上。3.1.3业务数据:推理数据:3000w,推理字段495个,机器学习Xgboost模型字段:495个。3.2指标分析由于性能测试要求整个链路耗时小于50ms,所以应该使用FlinkMetrics的LatencyMarker计算。3.2.1全链路时延的计算方法:全链路时延是指一段推理数据从源算子进入数据预处理算子,到最后算子输出结果的耗时,即如何处理一条数据所花费的时间,包括算子内部处理逻辑的时间、算子之间的数据传输时间、在缓冲区等待的时间。完整链路延迟是使用延迟指标计算的。延迟指标是源运营商根据当前本地时间生成的标记。它不参与各个算子的逻辑计算,只是跟随数据到下游算子。当它到达一个运算符时,它会计算当前的本地时间戳并使用源生成它。将源算子减去当前算子的时间戳。当到达sinkoperator或lastoperator时,将sourceoperator生成的时间戳减去当前本地时间戳,得到全链路延迟。示意图如下:由于使用了latencymarkers,所有的参数都需要在flink-conf.yaml中进行配置。latency.metrics.interval系统配置截图如下:3.2.2全链路吞吐量计算方法:全链路吞吐量=单位时间内处理的数据量/单位时间。3.3向FlinkonYarn集群提交任务**3.3.1直接提交作业**#-mjobmanager地址#-yjm1024指定jobmanager的内存信息#-ytm1024指定taskmanager的内存信息bin/flinkrun\-tyarn-per-job-yjm4096-ytm8800-s96\--detached-ccom.threeknowbigdata.datastream.XgboostModelPrediction\examples/batch/WordCount.jar\提交完成后,我们可以通过Flink看到job运行的任务结果WEBUI如下:因为推理模型只是一个模型,在现有状态下,所以全链路吞吐量考虑了有多少条推理数据进入源算子,从倒数第二个算子流出(最后一个算子只是总结指标)每秒。这个数字是全链路吞吐量。可以看出,在处理2000万条数据时,代码直接统计输出值和flinkwebUI的统计值基本一致,所以统计值可信。FlinkWEBUI跑的结果数据打开Prometheus在对话框输入全链路时延计算公式计算公式:avg(flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{source_id="cbc357ccb763df2852fee8c4fc7d55f2",operator_id="c9c0ca46716e76f6b700eddf4366d243",quantile="0.999"})3.4优化前性能分析任务提交到集群后,通过全链路时延计算公式和吞吐量时延计算公式,最终得到优化前结果的时延指标统计图如下:吞吐量指标统计图如下:通过本次测试后,从图中可以发现:延迟指标:增加并行度,吞吐量也高,但是整个链路的延迟明显增加(从1并行到32并行,延迟从110ms增加到3287ms),与要求的结果相去甚远。3.5问题分析经过Prometheus分析,结果如下:3.5.1并行问题:背压现象:在FlinkWEB-UI上,可以看到应用存在非常严重的背压,说明:链接中存在耗时问题。运营商封锁了整个链接;数据处理比拉取数据慢:数据源消耗的速度大于下游数据处理的速度;增加计算并行度:所以在接下来的测试中,会增加推理算子的并行度,相当于提升了下游的数据处理能力。3.5.2Buffer超时问题:Flink虽然是纯流式框架,但是默认开启了缓存机制(部分数据在上游积累,然后发送给下游);缓存机制可以提高应用程序的吞吐量,但也增加了延迟;推理场景:为了得到最好的延迟指标,第二轮测试的超时时间设置为0,记录吞吐量。3.5.3buffer个数:同上,Flink的buffer个数是可配置的;缓冲区越多,可以缓存的数据就越多;推理场景:为了得到最好的延迟指标,第二轮测试:减少Flink中的Buffer数量来优化延迟指标。3.5.4调优参数配置SOURCE和COFLATMAP的并行度按照1:12配置;缓冲超时配置为0ms(默认100ms);//在代码中设置senv.setBufferTimeout(0);Buffer数量配置如下:修改flink-conf.yamlmemory.buffers-per-channel:2memory.float-buffers-per-gate:2memory.max-buffers-per-channel:2配置截图如下:3.6优化后的性能分析提交到集群后,通过全链路时延计算公式和吞吐量时延计算公式,最终得到优化后的结果。延迟指标统计图如下:吞吐量指标统计图如下:优化后LGB推理测试总结:延迟指标:当并行度增加时,延迟也会增加,但幅度为小(可接受)。其实在测试过程中是有一定背压的。如果提高SOURCE和COFLATMAP的并行度,可以进一步降低全链路延迟;吞吐量指标:随着并行度的增加,吞吐量也随之增加。当并行度增加到96时,吞吐量可以达到1.3W,此时的延迟维持在50ms左右(比较稳定)。3.7优化前后的LGB分析总结如下图所示:3.7.1吞吐量---影响因素:内存:对吞吐量和延迟没有影响,并行度与吞吐量呈正相关。增加kafka分区,增加吞吐量,增加source和维表source并行度,增加flatmap推理并行度3.7.2全链路延迟---影响因素:buffertimeout越短,数量越少,延迟越低。整条链路是否存在运营商拥堵(车道排队模型)。增加推理算子的并行度,减少延迟,增加吞吐量(即增加推理的处理能力)。
