本文转载自微信公众号《大数据左右手》,作者左手右手。转载本文请联系大数据右手公众号。理解背压什么是背压在一个流处理系统中,如果下游消费的速度跟不上上游生产数据的速度,这种现象称为背压(backpressure)。本文主要使用Flink作为流式计算框架来简化背压机制。为了更好的理解,只做简单的分享。产生背压的原因是下游消费的速度跟不上上游生产数据的速度。可能的原因有:(1)节点存在性能瓶颈。可能是节点所在机器出现网络、磁盘等故障,以及机器网络Latency和磁盘不足、频繁GC、数据热点等原因。(2)数据源产生数据速度过快,计算框架没有及时处理。比如消息中间件kafka,producer生产数据速度太快,下游flink消费计算不及时。(3)flink算子之间并行度不同,下游算子相对于上游算子太小。背压的影响首先,背压不会直接导致系统崩溃,而是处于一种不健康的运行状态。(1)背压会导致流处理作业的数据延迟增加。(2)Checkpoint受到影响,导致失败,导致状态数据无法保存。如果上游是Kafka数据源,在一致性要求下,可以不提交offset。原理:由于Flink的Checkpoint机制需要Barrier对齐,如果此时某个Task存在反压,Barrier的流动速度会变慢,导致整体Checkpoint时间变长。如果背压严重,也可能导致CheckpointTimeoutfailed。(3)state的大小受到影响,或者因为checkpointbarrier对齐要求。导致状态变大。原理:接收到较快的输入管道的屏障后,其后面的数据将被缓存但不进行处理,直到较慢的输入管道的屏障也到达。这些缓存的数据会被放到状态中,导致状态变大。如何查找和定位背压(1)在网页上发现fink的checkpoint生成超时失败。(2)查看jobmanager日志2021-10-1719:43:19,235org.apache.flink.runtime.checkpoint.CheckpointCoordinator-CheckpointCoordinator-Checkpoint236663ofjobd521558603f6ef25dfd053c665d6afbeexpiredbeforecompleting(3)直接在BackPressure界面可以看到。背压状态可以大致锁定可能存在背压的算子。但具体的背压是当前任务本身处理速度慢还是下游任务处理慢导致的,还需要通过metric监控进一步判断。原理:BackPressure接口会周期性采样Task线程栈信息,通过请求Buffer中线程被阻塞的频率来判断节点是否处于背压状态。计算缓冲区阻塞线程数占线程总数的比率。其中rate<0.1为OK,0.1<=rate<=0.5为LOW,rate>0.5为HIGH。(4)指标监控背压。buffer中的数据无法处理,barrier流转慢,导致checkpoint生成时间长,出现超时现象。输入和输出缓冲区都已满。outPoolUsage和inPoolUsage指标说明outPoolUsage发送端Buffer使用情况inPoolUsage接收端Buffer使用情况该指标可能出现在以下几种情况:(1)outPoolUsage和inPoolUsage都很低,说明当前Subtask正常。(2)outPoolUsage和inPoolUsage都很高,代表当前Subtask的下游背压。(3)outPoolUsage高通常受下游Task的影响。(4)如果inPoolUsage高,可能是背压的来源。因为通常背压会传递到它的上游,导致一些上游的Subtasks的outPoolUsage偏高。inputFloatingBuffersUsage和inputExclusiveBuffersUsage指标说明inputFloatingBuffersUsage每个Operator实例对应一个FloatingBuffers,inputFloatingBuffersUsage表示该Operator对应的FloatingBuffers的使用情况。inputExclusiveBuffersUsage每个Operator实例的每个远程输入通道(RemoteInputChannel)都有自己的一套独占缓冲区(ExclusiveBuffer),inputExclusiveBuffersUsage表示ExclusiveBuffer的使用率。可能会出现以下指标:(1)floatingBuffersUsage高,说明正在向上游传输背压。(2)如果floatingBuffersUsage高,exclusiveBuffersUsage低,说明背压可能倾斜。背压原理是基于Credit-basedFlowControl的背压机制Credit的反馈策略,保证每次上游发送的数据都是下游InputChannel能够承受的数据量。具体原理如下:(1)上游SubTask向下游SubTask发送数据时,会将Buffer中待发送的数据和上游ResultSubPartition累计的数据量(Backlogsize)发送给下游。下游从上游收到Backlog大小后,会将当前的Credit值反馈给上游。Credit值表示下游可以从上游接收Buffer的数量。1个缓冲区相当于1个学分。上游收到下游反馈的Credit值后,下次上游只会向下游发送最多的Credit数据,保证Socket层不会有数据积压。(2)当下游SubTask背压严重时,可能会向上游反馈ChannelCredit=0。此时上游知道下游对应的InputChannel没有可用空间,所以不会向下游发送数据。(3)上游会周期性的向下游发送检测信号,检测下游返回的Credit是否大于0,当下游返回的Credit大于0时,说明下游有可用的Buffer空间,而上游可以开始向下游发送数据。atlas进程的上述流程(1)上游的SubTaska发送完数据后,还有4个Buffer积压,那么发送的数据和Backlogsize=4会被发送给下游的SubTaskb。(2)下游收到数据后,知道上游积压了4个Buffer,于是向BufferPool申请Buffers。由于容量有限,下游InputChannel目前只有2个Buffer。(3)SubTaskb将ChannelCredit=2反馈给上游的SubTaska。那么上游下次最多只向下游发送2个Buffer的数据,这样上游每次发送的数据就是下游InputChannel的Buffer能够承受的数据量。建议参考官网[https://flink.apache.org/2019/07/23/flink-network-stack-2.html]自行了解老版本基于TCP的反压机制,此处不再赘述。解决背压Flink不需要专门的机制来处理背压,因为Flink中的数据传输已经提供了处理背压的机制。所以只能从代码和资源上做一些调整。(1)造成背压的部分原因可能是数据倾斜,我们可以通过WebUI中各个SubTask的索引值来确认。Checkpointdetail中不同SubTasks的Statesize也是分析数据倾斜的有用指标。解决方案是预先聚合数据组的key,消除数据倾斜。(2)代码执行效率问题,阻塞或性能问题。(3)TaskManager的内存大小导致背压。
