本文转载自微信公众号《KK架构师》,作者wangkai。转载请联系KKarchitects公众号。一、Flinkmetrics介绍Flink的metrics是Flink公开的一个度量体系。Metrics也可以通过在Flink配置文件conf/flink-conf.yaml中配置来暴露给外部系统。Flink已经原生支持许多报告器,例如JMX、InfluxDB、Prometheus等。我们也可以自定义指标,通过metrics收集。在实际开发中,我们经常需要查看当前程序的运行状态。Flink提供了一个UI界面,有更详细的统计信息。但是UI界面也有不完善的地方,比如想获取flink的实时吞吐量。本文将详细介绍如何通过metrics监控flink程序,自定义监控指标,以及将metrics应用到flinkUI界面。2、Metrics在UI页面的应用在flinkUI界面,我们点击任务详情,然后点击TaskMetrics,弹出如下界面。在添加指标按钮上,我们可以添加我需要的监控指标。注意:如果点击TaskMetrics没有显示Addmetrics,点击任务的DAG图就会显示出来。当我们在DAG图中点击某个算子的名称,Addmetric会显示该算子的监控指标,根据Partition显示,算子名称前的数字为分区号。3.各个指标的含义官网对各个指标的含义有详细的介绍:https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#availability4.自定义监控指标案例:计算map算子中的总输入数据,设置:DataStreamuserData=kafkaData.map(newRichMapFunction(){CountermapDataNub;@Overridepublicvoidopen(Configurationparameters)throwsException{mapDataNub=getRuntimeContext().getMetricGroup().addGroup("flink_test_metric").counter("mapDataNub");}@OverridepublicStringmap(Strings){Strings1="";try{String[]split=s.split(",");longuserID=Long.parseLong(split[0]);longitemId=Long.parseLong(split[1]);longcategoryId=Long.parseLong(split[2]);Stringbehavior=split[3];longtimestamp=Long.parseLong(split[4]);Mapmap=newHashMap();map.put("userID",userID);map.put("itemId",itemId);map.put("categoryId",categoryId);map.put("行为”,行为);map.put(“时间戳”,时间戳);s1=JSON.toJSONString(地图);mapDataNub.inc();System.out.println("data"+map.toString());}catch(NumberFormatExceptione){e.printStackTrace();}returns1;}程序启动后,可以显示在任务ui界面注意事项:搜索自定义或查看某个指标,需要在DAG图中点击相应算子名称指标的前缀0,1,2....。监控operator的分区数时,尽量不要监控operator重命名,使用默认名称,这样一套监控程序可以监控多个Flink任务,比如重命名sink,如果不同的Flink程序对sinks的命名不同,那么一套监控程序是无法监控多个Flink程序的。addSink(KafkaSink.getProducer()).name("kafka_sink");5.FlinkUI不显示接收和发送的算子数据的数量。有时候我们的Flink任务运行正常,数据也能打印出来保存到数据库中,但是UI上没有显示接收和发送的数据数量,导致无法监控指标,无法查看flink任务运行的具体情况。这是什么原因?原因:因为Flink默认开启算子链,所以当flink程序的所有算子都在一个链中,即在一个DAG(任务)中时,并没有把所有的数据都发送到下游,所以显示是0.比如下图的情况,所有指标都为0;解决方案:第一种方法:在flink程序中添加自定义metric第二种方法:使用startNewChain和disableChainin中断程序默认的operatorchain第三种方法:修改一个operator的parallelism,使其与上游一致downstreamoperatorsInconsistentsub-parallelism6.MetricReporterMetrics可以通过在flink配置文件conf/flink-conf.yaml中配置暴露给外部系统。Flink已经原生支持了很多reporter,比如JMX、InfluxDB、Prometheus等,同时也支持Customreporter。Flink自带了很多Reporter,包括JMX、InfluxDB、Prometheus等,接下来介绍InfluxDBReporter的使用。在flink配置文件conf/flink-conf.yaml中配置Influxdb相关信息即可,主要包括域名、端口号、用户密码等。flink1.10之后,使用metrics.reporter.influxdb.factory.class:org.apache.flink.metrics.influxdb.InfluxdbReporterFactorymetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.consistency:ANYmetrics.reporter.influxdb.connectTimeout:60000metrics.reporter.influxdb.writeTimeout:60000metrics.reporter.influxdb.interval:30SECONDS之前flink1.10metrics.reporters:influxdbmetrics.reporter.influxdb.class:org.阿帕奇:123注意事项:收集flinkSQL任务的监控指标,如果用户insertinto或insertoverwrite写的sql语句有单引号带换行符,写入influxdb会报错。查看influxdb收集的监控信息,发现会自动为我生成数据库和measurement。,所有的指标都存储在具体的measurement中七、flinkmetric监控方案前面介绍了flink公共监控指标以及如何自定义监控指标,那么实际开发flink任务我们需要知道这些监控指标的数据及时获取程序的密钥对于健康值和状态,我们需要使用flinkRESTAPI编写监控程序来获取这些指标。很简单,当我们知道每个指标请求的URL后,我们就可以写一个程序,通过http请求获取指标的监控数据。八、flinkRESTAPI监控程序为了获取flink任务的运行状态和吞吐量,我们需要注意两点:Flink集群模式需要知道JobManager(5004)的地址和端口对于flinkonyarn模式,你需要知道RM代理的JobManagerUI地址,比如http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx1。获取flink任务的运行状态(我们可以在浏览器中测试,输入如下连接)http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs返回的结果{jobs:[{id:"ce793f18efab10127f0626a37ff4b4d4",status:"RUNNING"}]}2.在/jobs/jobidhttp://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs/ce793f18efab10127f0626a37ff4b4d4{jid:"ce793f18efab10127f0626a3"7ff4b4"dtest,isStoppable:false,状态:“RUNNING”,开始时间:1551577191874,结束时间:-1,持续时间:295120489,现在:1551872312363,.....这里省略了n行。.....},{id:"cbc357ccb763df2852fee8c4fc7d55f2",parallelism:12,operator:"",operator_strategy:",description:"Source:CustomSource->FlatMap",optimizer_properties:{}}]}}九、更灵活的获取方式各个指标的请求连接有人可能会问,那么多指标,我是不是应该记住每个指标的请求的URL格式?今天教大家一个小技巧,一个前端技术,就是进入flink任务的UI界面,按住F12进入开发者模式,然后我们点击任意一个metric指标,马上就可以查看每个指标的请求URL。比如获取flink任务的backpressure:如下图,我们点击一??个task的status,按F12,然后看到backpressure,点击backpressure就可以获取task的backpressure。连接如下:http://127.0.0.1/proxy/application_12423523_133234/jobs/86eb310874aeccb37b58ae2892feced3/vertices/cbc357ccb763df2852fee8c4fc7d55f2/backpressure连接请求返回的json字符串如下:这很容易获得。10、案例:实时获取yarn上flink任务的运行状态我们使用flinkRESTAPI,通过http请求实时获取flink任务的状态。如果不在RUNNING状态,则调用或邮件报警,达到实时监控的效果。publicclassSendGet{publicstaticStringsendGet(Stringurl){Stringresult="";BufferedReaderin=null;try{StringurlNameString=url;URLrealUrl=newURL(urlNameString);//打开与URL的连接URLConnectionconnection=realUrl.openConnection();//设置通用连接。setRequestProperty("accept","*/*");connection.setRequestProperty("connection","Keep-Alive");connection.setRequestProperty("user-agent","Mozilla/4.0(compatible;MSIE6.0;WindowsNT5.1;SV1)");//建立实际连接connection.connect();in=newBufferedReader(newInputStreamReader(connection.getInputStream()));Stringline;while((line=in.readLine())!=null){result+=line;}}catch(Exceptione){System.out.println("发送GET请求时发生异常!"+e);e.printStackTrace();}//使用finally块关闭输入最后流{try{if(in!=null){in.close();}}catch(Exceptione2){e2.printStackTrace();}}returnresult;}publicstaticvoidmain(String[]args){Strings=sendGet(“http://127.0.0.1:5004/proxy/application_1231435364565_0350/工作");JSONObjectjsonObject=JSON.parseObject(s);Stringstring=jsonObject.getString("工作");Stringsubstring=string.substring(1,string.length()-1);JSONObjectjsonObject1=JSONObject.parseObject(substring);Stringstatus=jsonObject1.getString("status");System.out.println(status);}}结果