当前位置: 首页 > 科技观察

六张图告诉你Kafka是如何进行数据采集和统计的!

时间:2023-03-21 12:10:44 科技观察

大家好,我是君哥,在讲解Kafka的副本同步限流机制三部曲(源码)PartII(原理)之前,先讲解一下Kafka中的数据采集和统计机制。理解了这个机制,就会更容易理解限流。机制图好奇kafka监控中那些数据是怎么计算的吗?比如下图中LogiKM监控图中的指标,都是通过Jmx获取的kafka监控指标。今天我们来讨论一下。这些指标都是怎么计算出来的?在开始分析之前,我们可以自己想一想,如果让你统计前一分钟的流量,你会怎么统计才能让数字更准确?相信大家心里一定有一个词:kafka的数据采样和统计也用到了滑动窗口。该方法用于通过多个样本进行采样并组合统计。当然,这个过程对于滑动窗口的影子收集和统计是必不可少的。统计类图我们先来看一下整个Kafka数据采集和统计机制的类图。统计接口。这个接口有专门用来计算需要统计的值的方法/***measurethisquantity并返回结果为double*Parameters:*config-这个指标的配置*now-POSIXtimetomeasure(以毫秒为单位)*返回:*测量值*/doublemeasure(MetricConfigconfig,longnow);比如将近一分钟的returnbytesInStat:记录数据,上面是统计,但是统计需要数据来支持,这个接口是用来记录的,这个接口有一个方法/***recordgivenvalue*Parameters:*config-用于此指标的配置*value-要记录的值*timeMs-此值出现时的POSIX时间(以毫秒为单位)*/voidrecord(MetricConfigconfig,doublevalue,longtimeMs);有了这两个接口,基本上就可以记录数据和统计了。当然,这两个接口都有一个MetricConfig对象MetricConfig,它是一个统计配置类,主要定义了采样的样本数,单个样本的时间窗口大小,单个样本的事件窗口大小,以及限流机制。通过这样的配置,可以自由定义时间窗口的大小和采样的样本数。影响最终数据精度的类变量。这里我需要重点关注两个参数的单个样本的时间窗口大小:当前记录时间-当前样本的开始时间>=这个值,需要使用下一个样本。单个样本的事件窗口大小:当前样本窗口出现的次数>=这个值,整个统计需要用到下一个样本,不一定按照时间窗口,也可以按照事件窗口,可以根据不同的需求来选择配置完成后,大家心中已经有了最基本的概念。接下来,我们将使用Kafka内部经常使用的SampledStat记录和统计的一个抽象类来深入分析和理解。SampledStat样本记录统计抽象类该记录统计抽象类以采样的形式进行计算。它使用一个或多个样本进行采样和统计List样本;currentlyusedsamples:当前样本初始化值:initialValueSampledStat:实现了MeasurableStat的抽象类,表示可以采集和记录数据,也可以对数据进行统计分析当然,它自己也定义了两个抽象方法/**更新值特定样本的(单个样本)**/protectedabstractvoidupdate(Samplesample,MetricConfigconfig,doublevalue,longtimeMs);/**CombineallsamplesThedatatocountthedesireddata**/publicabstractdoublecombine(List样本,MetricConfig配置,现在很长);SampledStat的图形显示如上图所示,是SampledStat的图形显示,定义了SeveralsamplesSamplerecorddata@Overridepublicvoidrecord(MetricConfigconfig,doublevalue,longtimeMs){Samplesample=current(timeMs);如果(sample.isComplete(timeMs,config))sample=advance(config,timeMs);更新(示例、配置、值、时间);样本.eventCount+=1;}获取当前Sample编号,如果没有则创建一个新Sample,创建时设置初始化值和Sample开始时间(当前时间),并保存在样本列表中判断Sample是否完成(超出窗口期),判断逻辑为当前时间-当前Sample的开始时间>=配置的时间窗口值或事件总数>=配置的事件窗口值/**currentTime-当前Sample的开始时间>=配置的时间窗口值或事件总数>=配置的事件窗口值**/publicbooleanisComplete(longtimeMs,MetricConfigconfig){returntimeMs-lastWindowMs>=config.timeWindowMs()||eventCount>=config.eventWindow();}3.如果这个Sample已经完成(超出窗口期),则开始选择下一个窗口,如果下一个还没有创建,则创建一个新的,如果下一个已经存在,则重新设置这个Sample4。得到最终要用到的Sample之后,记录数据这个Sample中如何记录,需要具体的实现类来实现,因为最终的统计数据可以不一样。比如你只想记录Sample中的最大值,那么更新的时候判断是否比之前的值大,如果大就更新。如果要统计平均值,那就让单个Sample中的所有值累加(最后除以Sample个数求平均值)5.记录事件个数+1。显示记录数据的图表统计/**测量统计**/@Overridepublicdoublemeasure(MetricConfigconfig,longnow){//重置过期样本purgeObsoleteSamples(config,now);//合并所有样本数据,并展示最终的统计数据,实现方法returncombine(this.samples,config,now);}首先重置过期样本,过期样本的意思是:当前时间-每个样本的开始事件>样本数*每个样本的窗口时间;是滑动窗口的概念,只统计这个滑动窗口的样本数据,过期的样本数据会被重置(过期的数据不被接受),如下图,滑动窗口重置所有过期的数据组合对不同维度的数据进行采样和统计,并返回值。因为在不同的场景下你要获取的数据是不一样的,这只是一个抽象的方法。你需要实现一个类来实现这个计算逻辑。比如,如果是计算平均值Avg,它的计算逻辑是将所有的样本数据值累加起来,除以累加的次数。我们来看看不同的统计实现类Avg计算平均值。一个简单的SampledStat实现类,统计所有样本的最终平均值。每次都会累积每个样本。记录数值,最后叠加所有样本数据/记录总数这里插入图片描述Max计算最大值每个样本保存本次样本的最大值,然后比较所有样本值的最大值这里插入图片说明WindowedSum所有样本窗口的总值累加每个样本的记录值,计数时将所有样本的累加值累加返回这里。插入一张图片说明RateRate样本记录统计计算RateRate也实现了MeasurableStat接口,说明它也有记录记录和统计度量的方法。其实这个类是一个组合类,结合了SampledStat和TimeUnit单元。这不是很明显吗?SampledStat负责记录和统计,将获取到的数据与TimeUnit进行处理,得到率例如SampledStat的实现类AVG可以计算出统计评价值,但是如果除以一个时间维度,是否可以得到平均率如何计算统计的有效时间?这个有效时间的计算会影响到最终rate的结果publiclongwindowSize(MetricConfigconfig,longnow){//重置过期样本stat.purgeObsoleteSamples(config,now);//总运行时间=当前时间-最早样本的开始时间longtotalElapsedTimeMs=now-stat.oldest(now).lastWindowMs;//总时间/单次创建时间=全窗口次数intnumFullWindows=(int)(totalElapsedTimeMs/config.timeWindowMs());intminFullWindows=config.samples()-1;//如果可用窗口小于所需的最小值,则将差值添加到totalElapsedTimeif(numFullWindows,它没有方法,只是定义,当有另一个子接口是Gauge时,它不使用上面的采样形式来统计数据,它返回当前值,瞬时值它提供的方法是value(),而Measurable提供的是measure(),在Kafka中很少用到,就不详细介绍了。那么在这篇文章中,我们主要讲解了Kafka中的数据采集和统计机制。那么下一篇我们就来说说Kafka的监控机制,如何将收集到的信息保存起来提供给外界!!!