现象采集端采集数据后,输出系统发现无法查询系统数据。检查问题后发现是访问问题和治理问题:1.访问数据如果需要记录如果是上传,需要上传到mysql处理。由于网络问题,访问系统和mysql无法直接交互。中间用一个dataMaster做转发。但是在转发的时候,接入系统拉取了一批数据之后,比如100条,除了上报拉取的数据记录。还需要通过dataMaster转发每一条数据,然后保存到mysql中。期间,部分数据写入失败。2.治理系统治理系统数据一直在消费kafka主题数据,但是发现有数据积压:如下:然后在查看flink治理程序时,有背压:如下:说明算法处理不及时,导致数据处理不来。解决方法一、接入问题1、接入问题的数据问题,因为之前接入系统是直接写入kafka的(这种方式比较稳定);现在还是需要在Channels的topic下,把每个channel的访问数据写到subject:access_中。然后通过java程序消费topic数据,然后写入mysql数据仓库,写入一条数据后通过topic提交提交数据。2、统计一下,每天采集端有多少数据?那么接入端接入了多少条数据。做一个监控检查。2.治理问题已经和数据打通了,然后发现是算法反压问题。现在根据日志分析算法调用情况,做一个算法统计。k8s服务部署日志查询进入服务后,发现我们服务所在节点为:192.168.58.48。我们进入本机,找到对应的日志目录文件。进入以下目录:/var/log/containers到对应文件llxxx-governance-news-hct-k8s-kafka*使用grep生成对应文件linuxtee命令用于读取标准输入数据并输出其内容作为文件sudogrep"timeconsuming"/var/log/containers/gtcom-governance-news-hct-k8s-kafka-taskmanager-1-4_gtcom-governance_flink-task-manager-a69dacf7d3165f57ddbfc7db2fb90c3e6b73d741f1ea6a323f4be2b90log.go5有时我们执行文件,则当前目录下不会生成任何文件。原因是我们|teealgoTimes.log是另一个文件。这个文件在当前目录下是没有权限生成的,因为sudo只是给前面的grep加了权限。所以我们应该设置输出文件目录为:sudogrep"time-consuming"/var/log/containers/gtcom-governance-news-hct-k8s-kafka-taskmanager-1-4_gtcom-governance_flink-task-manager-a69dacf7d3165f57ddbfc7db2fb90c3e6b75d741f1ea6a290flog|tee/home/${user}/algoTimes.log如下所示运行上述文件:生成以下文件:-rw-rw-r--。处理文件:algo-perf-log.py详情如下:#!/usr/bin/envpython#coding:utf-8importsysimportrefrommathimportfloor__helper='''sudogrep"time-consuming"/var/日志/容器/gtcom-governance-news-hct-k8s-kafka-taskmanager-1-4_gtcom-governance_flink-task-manager-a69dacf7d3165f57ddbfc7db2fb90c3e6b73d741f1ea6a323f4be2b27b55a904.log|head{"log":"2021-10-3010:11:05,060[LegacySourceThread-Source:ReadKafkadata-\u003eStringconversiontoJSONObject-\u003eformatconversion-\u003eAssignKeyToData(18/21)#0]INFOcom.gtcom.governance.drivers.Processor-[messageId=150986]process[NewsGovernanceProcess]Stage[FormatConversion][DataGovernanceStart][NewsBeginProcessor]处理结果:[SUCCESS],耗时:[3]ms\n","stream":"stdout","time":"2021-10-30T02:11:05.06080345Z"}{"log":"2021-10-3010:11:05,063[旧源线程-来源:读取Kafka数据-\u003e字符串转换为JSONObject-\u003e格式转换-\u003e将密钥分配给数据(20/21)#0]INFOcom.gtcom.governance.drivers.Processor-[messageId=150965]Process[消息治理流程]阶段[格式转换][数据治理开始][NewsBeginProcessor]处理结果:[SUCCESS],耗时:[6]ms\n","stream":"stdout","time":"2021-10-30T02:11:05.06388483Z"}{"log":"2021-10-3010:11:05,257[LegacySourceThread-Source:ReadKafkadata-\u003eStringconvertedtoJSONObject-\u003eFormatConversion-\u003eAssignKeyToData(18/21)#0]INFOcom.gtcom.governance.drivers.Stage-[messageId=150986]流程[新闻治理流程]阶段[格式转换]总处理时间:[200]ms\n","stream":"stdout","time":"2021-10-30T02:11:05.257418164Z"}'''pattern1=re.compile(r'\[([^\]]+)\]处理结果:\[SUCCESS\],耗时:\[(\d+)\]ms')pattern2=re.compile(r'\[AlgorithmGovernance\](总处理时间):\[(\d+)\]ms')defpercentile(data):data.sort()N=len(data)百分比s=range(1,100)return','.join([str(data[int(floor(N*p/100))])forpinpercents])defget_data():d={}forline在sys.stdin中:m=pattern1.findall(line)或pattern2.findall(line)如果m:n,t=m[0]d.setdefault(n,[]).append(int(t))返回dif__name__=='__main__':rt_data=get_data()foralgoTypeinrt_data:print('%s,%d,%s'%(algoType,len(rt_data[algoType]),percentile(rt_data[algoType])))分析:(方法:get_data)1.forlineinsys.stdin:从标准数据流中读取每条数据2.m=pattern1.findall(line)orpattern2.findall(line):根据匹配每条数据要规则。3.d.setdefault(n,[]).append(int(t)):设置d的每条数据,设置d。分析:(方法:百分位数)1.计算数据的百分比。统计结果验证1.生产数据运行我们通过python生成的文本数据。猫algoTimes.log|pythonalgo-perf-log.py如下:输出如下信息:2.Excel统计新建一个csv文件:然后打开文本:打开后,输入内容数据。双击222.csv文件,用Excel打开:选择所有文件:生成的文件如下:然后去掉并汇总第二列数据:生成如下算法结果:因此可以判断效率NewsDataQualityProcessor的算法比较差。
