本文主要分享字节跳动使用FlinkState的实践经验。内容包括FlinkState相关实践和Byte内部的一些引擎优化,希望能给Flink用户的开发和调优提供一些参考。前言Flink作业需要借助State来完成聚合、Join等有状态计算任务,而State一直是作业调优的重点。目前,State和Checkpoint已经在字节跳动中得到广泛应用。在业务层面,State支持数据集成、实时数仓、特征计算、样本拼接等典型场景;作业类型支持Map-Only通道任务和ETL任务,窗口聚合计算的索引统计任务,多流Join等存储数据细节的数据拼接任务。以WordCount为例,假设我们需要统计Word在60秒窗口内出现的次数:selectword,TUMBLE_START(eventtime,INTERVAL'60'SECOND)ast,count(1)fromwords_streamgroupbyTUMBLE(eventtime,INTERVAL'60'SECOND),60swindow中的每一个word还没有被触发,每个word对应的出现次数就是FlinkState,每次window接收到新的数据,都会更新这个state,直到最终输出。为了防止作业失败和状态丢失,Flink引入了分布式快照Checkpoint的概念,周期性地将状态持久化到Hdfs中。如果jobfailover,会从上次成功的checkpoint恢复job状态(比如kafka的offset,windowstatistics等)。在不同的业务场景中,用户往往需要对State和Checkpoint机制进行调优,以保证任务执行的性能和Checkpoint的稳定性。在阅读下面的内容之前,我们可以回想一下我们在使用FlinkState时是否经常面临以下问题:当一个stateoperator出现处理瓶颈时,增加资源并不能提升性能。我不知道如何解决性能瓶颈。Checkpoint经常出现,执行效率慢,barrier对齐时间长,超时频繁。大作业的checkpoints生成的小文件太多,对在线HDFS造成压力。RocksDB参数太多。使用时,不知如何选择作业。恢复时间过长导致在线中断State和RocksDB相关概念State分类介绍由于OperatorState背后的StateBackend只有DefaultOperatorStateBackend,所以用户在使用时通常会指定FsStateBackend和RocksDBStateBackend两种。其实他们指定了KeyedState对应的StateBackend类型:FsStateBackend:DefaultOperatorStateBackend和HeapKeyedStateBackend的组合RocksDBStateBackend:DefaultOperatorStateBackend和RocksDBKeyedStateBackend的组合RocksDB简介RocksDB是一个嵌入式Key-Value数据库,在Flink中作为RocksDBStateBackend的底层存储.如下图所示,RocksDB持久化的SST文件在本地文件系统上被多级组织,重复的、过期的、删除的数据会通过异步Compaction在不同的级别之间进行合并。RocksDB在写入过程中,将数据序列化写入WriteBuffer。WriteBuffer写满后转为ImmutableMemtable结构,然后通过RocksDB的flush线程从内存刷新到磁盘;从WriteBuffer和ImmutableMemtable读取数据。如果没有找到,将查询BlockCache。如果内存中没有,则会分层查找底层SST文件,将返回结果所在的DataBlock加载到BlockCache中。返回上层应用。RocksDBKeyedStateBackend增量快照介绍这里介绍在大状态场景下经常需要调优的RocksDBKeyedStateBackend增量快照。RocksDB具有append-only特性。Flink利用这个特性将两个checkpoint之间的SST文件列表的差异作为状态增量上传到分布式文件系统,通过JobMaster中的SharedStateRegistry来注册和过期状态。如上图,Task拍了3张快照(假设job设置保留最新的2个Checkpoints):CP-1:RocksDB生成两个文件sst-1和sst-2,Task将文件上传到DFS,JM记录sst文件对应的引用计数CP-2:RocksDB中的sst-1和sst-2通过compaction生成sst-1、2,以及新生成的sst-3文件,Task上传两个新添加的文件到DFS,JM记录引用统计sst文件对应的CP-3:RocksDB中新生成sst-4文件,Task将增量sst-4文件上传到DFS,CP-3完成后,由于只保留最后2个CP,JobMasterExpireCP-1,将CP-1中sst文件对应的引用计数减1,删除引用计数归0的sst文件(sst-1和sst-2)。增量快照涉及Task多线程上传/下载增量文件的过程,JobMaster引用计数统计,大量与分布式交互文件系统比其他StateBackends更复杂。在100+GB甚至TB级别的状态下,作业很容易出现性能和稳定性瓶颈的问题。State实践经验提升State操作性能当用户使用State时,会发现操作State并不是一件“容易”的事情。如果使用FsStateBackend,会经常遇到GC问题,频繁调整参数等问题;如果使用RocksDBStateBackend,涉及到磁盘读写,对象序列化,在没有相关指标的情况下,不容易定位性能问题,或者面对RocksDB的大量参数,不知道如何调整到最佳状态。目前字节跳动140+个作业的状态大小已经达到TB级别,单个作业最大状态为60TB。在逐步支持大型状态作业的实践中,我们积累了一些状态调优经验,并对一些引擎进行Side-by-side修改,以支持更好的性能,降低作业调优成本。选择合适的StateBackend我们都知道FsStateBackend适合小状态的作业,RocksDBStateBackend适合大状态的作业,但是在实际选择FsStateBackend时会遇到以下问题:开发前,无法准确预估状态大小,or状态大小估计的复杂度很高。随着业务的增长,所谓的“小状态”迅速变成“大状态”,需要人工干预调整相同的状态大小。由于状态过期时间不同,使用FsStateBackend的GC压力也不同。针对FsStateBackend中的以上问题,可见FsStateBackend的维护成本还是比较高的。在Byte内部,我们暂时只推荐FsStateBackend用于一些总状态小于1GB的作业。对于短视频、直播、电商等大流量业务,我们更倾向于推荐用户使用RocksDBStateBackend,以减少未来的GC风险,更好的稳定性。随着内部硬件的更新迭代和ssd的推广,从长远来看,我们希望将StateBackend汇聚到RocksDBStateBackend,提高作业稳定性,降低用户运维成本;在性能方面,我们预计RocksDBStateBackend在小状态场景下与FsStateBackend相当接近甚至相当。观察性能指标,使用火焰图分析瓶颈Flink社区版使用RocksDBStateBackend时,如果遇到性能问题,基本上很难确定问题的原因。此时建议打开相关指示灯进行故障排除[1]。另外,在字节跳动内部,RocksDBStateBackend性能瓶颈的原因有很多。我们构建了一个比较完善的RocksDB指标体系,在Flink层面默认揭示了一些RocksDB的关键指标,并增加了State-relatedIndicators,部分指标如下:RocksDB性能瓶颈的常见原因如下:state大小ofasinglerecord过大,由于rocksdb的append-only特性,writebuffer很容易被填满,导致数据flushing和compaction频繁,抢占jobsCPUOperator内部的RocksDB容量过大。如果Operator所在的RocksDB实例大小超过15GB,我们会明显看到更频繁的compaction并导致RocksDB频繁出现writestall硬件问题。比如磁盘IO满了,从State的操作延迟可以看出,如果长时间停留在秒级别,说明硬件或者机器负载高。除了上述指标外,另一种可以使用的方法是火焰图。常用方法如使用阿里的arthas[2]。Flink和RocksDB的CPU开销会显示在火焰图中。示意图如下:如上图可以看出,火焰图中compaction开销占了非常大的比重。定位到compaction问题后,我们就可以利用ValueSize和RocksDBcapacity、jobparallelism和resources做进一步的分析。使用合理的RocksDB参数除了Flink提供的RocksDB参数[3],RocksDB还有很多调优参数可供用户使用。用户可以通过自定义RocksDBOptionsFactory[4]来调优RocksDB。经过一些内部实践,我们列出了两个比较有效的参数:关闭RocksDB的压缩(需要自定义RocksDBOptionsFactory):RocksDB默认使用snappy算法压缩数据,因为RocksDB的read和中有压缩相关的操作write,和Compaction,因此在CPU敏感的作业中,可以通过ColumnFamilyOptions.setCompressionType(CompressionType.NO_COMPRESSION)关闭压缩,为CPU使用磁盘空间容量,减少CPU消耗。开启RocksDB的bloom-filter(需要自定义RocksDBOptionsFactory):默认不使用RocksDBBloom-filter[5]。开启bloom-filter可以节省RocksDB的部分读取开销。其他对cache、writebuffer、flush/compaction线程的调整也可以在不同的场景下获得不同的好处,比如在写少读多的场景下,我们可以增加Cache来减少磁盘IO。这里,我们要注意一点。由于很多参数都是使用内存或者磁盘来换取性能的提升,所以以上参数的使用需要结合具体的性能瓶颈分析才能达到最好的效果,比如上面的火焰图中,可以很明显的看出snappy的压缩占用了大量的CPU开销。这时候可以试试压缩相关的参数。注意RocksDBStateBackend的序列化开销。使用RocksDBState的相关API,Key和Value都需要进行序列化和反序列化。如果Java对象比较复杂,用户没有自定义Serializer,那么它的序列化开销会比较高。大的。例如,对于去重操作中常用的RoaringBitmap,在序列化和反序列化时,MB级对象的序列化开销达到秒级,对作业性能是非常大的损失。因此,对于复杂对象,我们建议:业务尽量使用State中更精简的数据结构,去掉不需要存储的字段,在StateDescriptor中通过自定义Serializer减少序列化开销,显式注册PB/ThriftSerializer到KryoSerializer[6]]减少State操作的次数,比如下面的示例代码,如果使用FsStateBackend,性能损失不大;但在RocksDBStateBackend上,userKey由于两个State操作而有额外的序列化开销,如果userKey本身If(mapState.contains(userKey)){UVuserValue=mapState.get(userKey);}有关序列化性能和指导的更多信息,请参考社区调优文档[7]。搭建一个RocksDBStatecache上文提到,RocksDB的序列化开销可能比较大。字节跳动内部在StateBackend和Operator之间构建了一个StateBackendCacheLayer,负责在Operator内部缓存热点数据,并根据GC情况容量动态扩缩容,对于有热点的作业好处明显。同样,对于用户来说,如果工作热点比较明显,可以尝试在内存中建立一个简单的Java对象缓存,但是需要注意以下几点:控制缓存的阈值,防止缓存的对象过多。对GC造成太大的压力。缓存中的状态TTL逻辑处理,防止脏读,减少checkpoint耗时Checkpoint持续时间与很多因素有关,比如作业背压,资源是否充足等,这里我们从角度看如何提高checkpoint的成功率StateBackend率。一个Task级快照可以分为以下几个步骤:等待checkpointLock:在SourceTask中,触发Checkpoint的Rpc线程需要等待Task线程完成当前数据处理,然后释放checkpointLock,然后再触发检查站。这一步的耗时主要取决于用户的处理逻辑和每条数据的处理延迟。这一步的耗时主要是bufferqueue中barriers的排队时间同步阶段:在StateBackend上执行自定义快照方法和元数据快照,例如FsStateBackend会在内存中做一个state结构的浅拷贝在同步阶段。异步阶段:上传状态数据或文件到DFS字节跳动。这四个步骤也搭建了相关的监控板:在生产环境中,“等待checkpointLock”和“同步阶段”在业务逻辑上是比较耗时的,通常耗时比较短;从StateBackend层??面来说,一般我们可以通过优化“收集障碍”和“异步阶段”两个阶段来减少checkpoint时间。ReduceBarrieralignmenttime减少Barrier对齐时间的核心是减少in-flightBuffer的总大小。即使使用了社区的UnalignedCheckpoint特性,如果in-flightBuffer数量过多,最终写入分布式存储的状态也会过大,有时in-flightBuffersize甚至会超过size状态本身,这将对异步阶段的耗时产生负面影响。减少通道中的Buffer数量:Flink1.11版本支持限制数据倾斜环境下单个通道的最大Buffer数量,可以通过taskmanager.network.memory.max-buffers-per-channel参数调整为减小单个Buffer的大小:如果单个数据的大小在KB级别以下。我们可以通过减小taskmanager.memory.segment-size来减小单个Buffer的大小,从而减少Barrier的排队时间,结合业务场景降低DFS的压力。如果在你的集群中,所有的Flink作业都使用在同一个DFS集群中,当业务增长到一定程度后,DFS的IO压力和吞吐量将成为“异步阶段”非常重要的参考指标。特别是RocksDBStateBackend的增量快照,每个Operator产生的状态文件都会上传到DFS,上传文件的数量与并行度和作业状态的大小成正比。在Flink中并行度较高的作业中,由于各个任务的快照基本同时发生,因此DFS的写请求数往往在几分钟内可达数千甚至数万。合理设置state.backend.fs.memory-threshold,减少DFS文件数量:该参数表示生成DFS文件的最小阈值。小于该阈值的状态将以??byte[]的形式封装在RPC请求中发送给JobMaster并持久化到_metadata中)。对于Map-Only任务,状态通常存储元信息相关的内容(比如Kafka的消费位移),状态比较小。我们可以通过增加这个参数来避免将这些状态放在磁盘上。在Flink1.11版本之前,state.backend.fs.memory-threshold的默认阈值1kb比较小,容易造成每个并行度上传自己的状态文件,上传文件的数量与并行度成正比并行性。我们可以根据业务场景调整这个参数,将DFS请求次数从N(N=parallelism)优化为1次。这里需要注意的是,如果阈值设置过高(MB级别),可能会导致_metadata过大,从而增加JobMaster恢复Checkpoint元信息和部署Task时的GC压力,导致JobMaster频繁FullGC。合理设置state.backend.rocksdb.checkpoint.transfer.thread.num线程数,降低DFS压力:该参数表示做快照时上传和恢复快照时下载RocksDB状态文件的线程数。在大状态的情况下,为了提高Checkpoint的效率,用户可能会将线程数设置的比较大,比如10个以上。这种情况下,快照创建和快照恢复都会带来非常大的DFS的瞬时压力。尤其是HDFSNameNode,很可能会瞬间填满NameNode的请求资源,影响其他正在执行的作业。增加state.backend.rocksdb.writebuffer.size:该参数表示RocksDB刷盘前存储在内存中的数据大小。如果job的吞吐量比较大,Update比较频繁,导致RocksDB目录下的文件过多,可以通过增加这个参数,通过增加文件大小来一定程度上减少上传文件的数量,以及可以减少DFSIO的数量。合并RocksDBKeyedStateBackend上传的文件(FLINK-11937)。在社区版本的增量快照中,RocksDB新生成的每个SST文件都需要上传到DFS。以HDFS为例,HDFS默认的块大小通常为100+MB(wordInternalsectionjitter为512MB),而RocksDB生成的文件通常在100MB以下。对于小数据任务甚至KB级别的文件大小,Checkpoint会产生大量频繁的小文件请求。对于HDFS元数据管理和NameNode访问可能会非常有压力。社区在FLINK-11937中提出了合并上传小文件的想法。同样,在bytes的内部实现中,我们将合并小文件的逻辑抽象成一个Strategy,这样我们就可以和其他因素一起实现一个符合自己业务场景的上传策略。提高StateBackend的恢复速度除了State性能和DFS瓶颈外,StateBackend的恢复速度也是实际生产过程中需要考虑的一个非常重要的点。在生产过程中,我们会发现,由于一些参数的设置不合理,改变作业配置和并发度,会导致作业重启从快照恢复时性能特别差,恢复时间可以只要十分钟以上。谨慎使用UnionState。UnionState的特点是当作业恢复时,每个并行度的恢复状态是所有并行度状态的并集。这种特性导致在JobMaster的状态分配和TaskManager的状态恢复中UnionState都比较重:JobMaster需要完成一次NN遍历,将每个并行度的状态赋值给所有并行度状态的并集。(其实HashMap可以用来优化遍历到N1的复杂度[8])TaskManager需要读取fullUnionState的状态文件。比如恢复1000个并行度的job时,恢复每个并行度的UnionState。每个状态都需要读取1000个parallelismOperator生成的状态文件,效率很低。(我们内部的优化是在JobMaster端将UnionState的状态聚合成一个文件,这样TaskManager在恢复时只需要读取一个文件。)在实际使用UnionState时,除了恢复速度慢的问题外,如果使用不当也会对DFS造成很大的压力,所以建议避免在高并行作业中使用UnionState,以减少额外的运维负担。Incrementalsnapshotvsfullsnapshot恢复RocksDBStateBackend中支持的增量快照和fullsnapshot(或savepoint)。这两个快照之间的差异导致它们在不同场景下的恢复速度不同。增量快照是将RocksDB的底层增量SST文件上传到DFS;而fullsnapshot是遍历RocksDB实例的Key-Value,写入DFS。场景由是扩展还是收缩来定义。两种快照的恢复速度如下:非扩缩容场景:增量快照的恢复只需要拉取SST文件到本地即可完成RocksDB*(多线程)全卷的初始化恢复快照需要遍历属于当前Subtask的KeyGroupRange下的所有键值对,写入本地磁盘,完成RocksDB(单线程)扩缩容场景的初始化:增量快照的恢复涉及到多组RocksDB数据合并,涉及下载多组RocksDB文件,写入同一个RocksDB产生大量compaction。在compaction过程中,会发生严重的写放大。全量快照的恢复与上述非扩缩容场景(单线程)一致。这个比较麻烦。最重要的一点是在扩张和收缩恢复时更容易遇到长尾问题。由于单个并行状态,整体恢复时间延长。目前社区版还没有彻底的解决办法。我们还针对大型作业的状态进行了恢复速度优化。这里根据社区支持的功能,给出一些加快扩缩容场景下恢复速度的建议:扩缩容时尽量从savepoint恢复,可以避免合并产生的增量快照Compaction开销多组任务的RocksDB实例。调整RocksDB相关参数,增加WriteBuffer大小和Flush/Compaction线程数,增强RocksDB批量flush数据的能力。总结在本文中,我们介绍了State和RocksDB的相关概念,并针对字节跳动在State应用中遇到的问题,给出了相关的实践建议。希望阅读本文后,您对FlinkState在日常开发工作中的应用有了更深入的认识和认识。
