当前位置: 首页 > 科技观察

ApacheFlink漫谈系列(05)-容错

时间:2023-03-20 02:06:33 科技观察

实践问题在流计算场景中,数据会源源不断地流入ApacheFlink系统,每条数据进入ApacheFlink系统都会触发计算。那么在计算过程中,如果由于网络、机器等原因导致任务失败,ApacheFlink会如何处理呢?在《Apache Flink 漫谈系列 - State》一文中,我们介绍了ApacheFlink会使用State来记录计算的状态。状态恢复。但是State的内容是怎么记录的呢?ApacheFlink如何保证Exactly-Once语义?这就涉及到ApacheFlink的FaultTolerance机制。本文将为您介绍相关内容。什么是容错容错(FaultTolerance)是指容错,当发生故障时能够自动检测到,并使系统自动恢复正常运行。当出现某些特定的网络故障、硬件故障、软件错误时,系统仍能执行一组特定的程序,或程序不会因系统故障而终止,执行结果不包括由系统故障引起的故障系统故障。错误。传统数据库容错我们知道MySQL的binlog是一个AppendOnly的日志文件,MySQL的主备复制是高可用的主要方式,而binlog是主备复制的核心手段(当然MySQL的高可用细节是非常复杂,有很多不同的优化点,比如纯异步复制优化到半同步和同步复制,保证异步复制binlog导致主从同步时断网,导致主从同步不一致主从等)。MySQL主备复制是MySQL容错机制的一部分,也包括事务控制。在传统数据库中,事务可以设置在不同的事务级别,以保证不同的数据质量。级别从低到高如下:Readuncommitted-Readuncommitted表示一个事务可以读取另一个未提交事务的数据。那么这种东西控制了成本***,但是会引起另外一个东西读脏数据,那么读脏数据的问题怎么解决呢?使用Readcommitted级别...Readcommitted-readcommitted,即一个事务要等到另一个事务commit后才能读取数据。这一关可以解决读取脏数据的问题,那么这一关有什么问题呢?这个关卡还有一个不能重复读取的问题,就是:打开一个读东西T1,先读到字段F1的值是V1,这时候另一个东西T2可以UPDATA这个字段值V2,导致T1获取到V2的时候再次读取字段值,同一事物中两次读取不一致。那么如何解决不可重复读的问题呢?使用Repeatableread级别...Repeatableread-Repeatableread是指当数据被读取(事务开始)后,不再允许进行修改操作。重复阅读模式需要等待事物的顺序,需要一定的代价才能获得高质量的数据信息。那么反复阅读有没有问题呢?是的,重复阅读水平还有一个问题,就是幻读。幻读的原因是INSERT,那么怎么解决幻读呢?使用可序列化级别...可序列化-序列化是最好的事务隔离级别。在这个层次上,事务是串行顺序执行的,可以避免脏读、不可重复读和幻读。但是这种事务隔离级别效率低,对数据库性能消耗较大,所以一般不使用。主从复制和事务控制是传统数据库的容错机制。流计算容错的挑战流计算容错的一大挑战是低延迟。许多ApacheFlink任务是7x24小时不间断的,具有端到端的秒级延迟。当发生断线等意外问题时,要迅速恢复正常,又不影响计算结果的正确性,是极其困难的。同时,除了流计算的低延迟需求外,在计算模式上也存在挑战。ApacheFlink支持Exactly-Once和At-Least-Once两种计算模式。如何在故障转移时避免重复计算,进而准确实现Exactly-Once,也是流计算容错要解决的关键问题。ApacheFlink的容错机制ApacheFlink的容错机制的核心是不断创建分布式流数据及其状态的快照。这些快照在系统出现故障时用作回退点。ApacheFlink中创建快照的机制称为Checkpointing。Checkpointing的理论基础由Stephan在LightweightAsynchronousSnapshotsforDistributedDataflows中详细描述。该机制源自K.MANICHANDY和LESLIELAMPORT发表的Determining-Global-States-of-a-Distributed-SystemPaper,描述了如何解决分布式系统中的全局状态一致性问题。在ApacheFlink中,检查点机制用于容错。Checkpointing会生成一个类似于binlog的数据文件,可以用来恢复任务状态。ApacheFlink也有类似数据库事务控制的数据计算语义控制,比如:At-Least-Once和Exactly-Once。Checkpointing的算法逻辑我们说过,Checkpointing是ApacheFlink中FaultTolerance的核心机制。我们使用Checkpointing创建有状态操作符的快照,包括计时器、连接器、窗口和用户定义的状态。在Determining-Global-States-of-a-Distributed-System的全局状态一致性算法中,重点关注了全局状态的对齐问题,对齐方法在LightweightAsynchronousSnapshotsforDistributedDataflows的核心中有描述,在ApacheFlink中采用的是DAG中的异步快照,通过在流信息中插入屏障来完成。下图(来自LightweightAsynchronousSnapshotsforDistributedDataflows)描述了Asynchronousbarriersnapshotsforacyclicgraphs,这也是ApacheFlink中使用的方法。上图描述了一个增量计算字数的Job逻辑。核心逻辑如下:barrier由源节点发送;屏障会将流上的事件划分到不同的检查点;汇聚到当前节点的多流barrier必须对齐;barrier对齐后,会进行Checkpointing生成快照;快照完成后,barrier会向下游发送,一直持续到Sink节点。这样在整个流计算中以barrier的形式进行checkpointing。随着时间的推移,在整个流计算过程中,会按照时间顺序不断进行Checkpointing,如下图:生成的快照会存储在StateBackend中,相关State的介绍可以查到《Apache Flink 漫谈系列 - State》。这样,在执行Failover时,从上次成功的checkpoint恢复。在Checkpointing的控制中,我们了解到,随着时间的推移,我们会不断对整个流做Checkpointing,不断的产生快照存储在Statebackend中,那么Checkpointing多久做一次呢?如何持久化生成的快照?带着这些问题,我们来看看ApacheFlink是如何控制Checkpointing的?可配置的参数有哪些:(这些参数定义在CheckpointCoordinator中)checkpointMode——检查点模式,分为两种模式:AT_LEAST_ONCE和EXACTLY_ONCE;checkpointInterval-检查点时间间隔,单位为毫秒;checkpointTimeout-检查点超时时间,单位为毫秒。ApacheFlink中还有一些其他的配置,比如:是否删除存储在外部存储中的checkpoints数据,如果不删除,即使job被取消,checkpoint信息也不会被删除,checkpoint可以用于恢复作业时的状态。我们有两种配置方法,如下:ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION-当作业被取消时,外部存储的检查点不会被删除;ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION-当作业被取消时,外部存储的检查点将被删除。ApacheFlink是如何实现Exactly-once通过以上内容,我们了解到ApacheFlink中的Exactly-Once和At-Least-Once只是执行checkpointing时的配置方式。两种模式下checkpointing的原理是一样的,那么在实现上有什么本质区别呢?1.SemanticsAt-Least-Once-语义是流上的所有数据都至少被处理过一次(不丢失数据)Exactly-Once-语义是流上的所有数据都必须被处理并且只能被处理一次(不丢失数据,不重复)从语义上看,Exactly-Once对数据处理的要求比At-Least-Once更严格、更精确,所以更高的要求意味着更高的成本,这里的成本就是延迟。2.实现上面ApacheFlink中At-Least-Once和Exactly-Once有什么区别?差异体现在多通道输入(如Join)上。当所有输入障碍还没有完全到达时,它们应该提前到达。在Exactly-Once模式下事件会被缓存(不处理),而在At-Least-Once模式下即使所有的输入障碍还没有完全到达,earlyevent也会被处理。也就是说,在At-Least-Once模式下,对于下游节点来说,原本属于CheckpointN的数据可能已经在CheckpointN-1处理过了。下面我以Exactly-Once为例来解释为什么Exactly-Once模式比At-Least-Once模式有更高的延迟?屏障对齐阶段从上游发送的第一个屏障开始;在对齐期间,先到达的输入数据将缓存在缓冲区中;当Operator接收到所有上游barriers时,当前Operator会执行Checkpointing并生成快照并持久化;当Checkpointing完成后,barrier将被广播给下游的Operator。当多通道输入barrier不对齐时,先到达的barrier的输入数据会缓存在buffer中,不做处理。这样对于下游来说,buffer中的数据越多,延迟就越大。这种延迟的好处是相邻Checkpointing记录的数据(计算结果或事件)不会重复。与At-Least-Once模式相比,数据不会被缓冲,减少延迟的好处是以容忍数据重复为代价的。在ApacheFlink的代码实现中,使用了CheckpointBarrierHandler类来处理barrier。它的核心接口是:publicinterfaceCheckpointBarrierHandler{...//返回算子消费的下一个BufferOrEvent。此调用将导致阻塞,直到下一个BufferOrEventBufferOrEventgetNextNonBlocked()throwsException;...}其中BufferOrEvent可能是正常数据事件或特殊事件,例如屏障事件。对应At-Least-Once和Exactly-Once有两种不同的实现方式,如下:inputs都收到一个基于某个checkpoint的barrier,也就是上面说的alignment。为避免对输入流产生背压,BarrierBuffer将继续从阻塞的通道接收缓冲区并将其存储在内部,直到阻塞被移除。BarrierBuffer实现了CheckpointBarrierHandler的getNextNonBlocked,用于获取下一条要处理的记录。此方法会阻止调用,直到获取下一条记录。这里的记录包括两种,一种是来自上游没有标记为阻塞的输入,比如上图中的event(a);另一个是在blockedinputRecord中从bufferqueue中释放,比如上图中的event(1,2,3,4)。At-Least-Once模式-BarrierTrackerBarrierTracker将跟踪每个输入接收到的检查点的障碍。一旦它观察到一个检查点的所有障碍都已经达到,它会通知监听器检查点完成,触发相应的回调处理。与BarrierBuffer的处理逻辑不同,BarrierTracker不会阻塞已经发送到barrier的输入,也就是说没有使用对齐机制,所以这个checkpoint的数据会被及时处理,因此下一个checkpoint的数据checkpoint可能在checkpoint完成之前就已经到了。这只能在恢复期间提供At-Least-Once语义保证。BarrierTracker还实现了CheckpointBarrierHandler的getNextNonBlocked,用于获取下一条要处理的记录。与BarrierBuffer相比,它的实现非常简单,只是阻塞获取要处理的事件。以上两种CheckpointBarrierHandler实现的核心区别在于BarrierBuffer会维护多路输入是否应该被阻塞,并缓存被阻塞输入的记录。所谓得必失,失必得,舍得舍得,这里也略有体现:)。在《Apache Flink 漫谈系列 - State》中,我们已经介绍了ApacheFlink中State存储的内容。例如,连接器将使用OperatorState来记录读取位置的偏移量。那么一个完整的ApacheFlink任务的执行图就是一个DAG,上面我们描述了DAG中一个节点的流程,那么Checkpointing整体的流程是怎样的呢?生成检查点并分发到HDFS的过程是怎样的?1.整体Checkpointing流程。看到一个完整的ApacheFlinkJobcheckpointing过程,JM触发Soruce启动barriers。当Operator收到上游发来的barrier后,就开始处理barrier。整体的Checkpointing是按照DAG从上到下逐个节点进行的。并持久化到Statebackend,一直到DAG的sink节点。2.增量Checkpointing对于一个流计算任务,数据会源源不断的流入,比如双流Join(ApacheFlink漫谈系列-Join会详细介绍),因为两边流事件的到达存在顺序问题,我们必须将左右数据存入state,Left事件流入会加入RightState的数据,Right事件流入会加入LeftState的数据,如下图所示,数据左右两边会持久化到StateMiddle:由于stream上数据不断流动,每个checkpoint产生的snapshot文件(RocksDB的sst文件)会随着时间的推移变得非常大,增加网络IO并延长checkpoint时间,最终导致checkpoint无法完成。这进而导致ApacheFlink失去了故障转移的能力。为了解决检查点增加的问题,ApacheFlink内部实现了IncrementalCheckpointing。这种增量的checkpoint机制会大大减少checkpoint的时间,如果业务数据稳定,每个checkpoint的时间也比较稳定。根据不同的业务需求设置Checkpoint间隔,稳定快速的进行Checkpointing,保证ApacheFlink任务在遇到故障时能够平滑的进行Failover。IncrementalCheckpointing的优化对于ApacheFlink的数百个任务节点带来的好处是不言而喻的。End-to-endexactly-once根据上面的介绍,我们知道ApacheFlink内部是支持Exactly-Once语义的。ApacheFlink要实现端到端(SorucetoSink)Exactly-Once,需要外部Soruce和Sink的支持,具体如下:恰好结束一次。比如我们上面提到ApacheFlink的checkpointing机制会记录源节点上的读取位置,所以需要外部源。Source提供读取数据的Position,支持基于Position读取数据。ExternalSinks的容错需求:ApacheFlink实现End-to-EndExactly-Once相对困难。以Kafka作为sink为例,当sink算子节点宕机时,根据ApacheFlink内部Exactly-Once模式的容错保证,系统会回滚到上一次成功的checkpoint继续写入,但是在上次成功的checkpoint之后,在当前checkpoint完成之前,一些新的数据已经写入kafka。ApacheFlink从上次成功的checkpoint开始继续向kafka写入数据,这导致Kafka再次从SinkOperator接收相同的数据,进而破坏了End-to-End的Exactly-Once语义(重复写入变成了At-Least-曾经),如果要解决这个One问题,ApacheFlink使用TwoPhaseCommit(两阶段提交)来处理。本质上,SinkOperator需要感知整体Checkpoint的完成,并在整体Checkpoint完成后将计算结果写入Kafka。小结本文为大家介绍了ApacheFlink的容错(FaultTolerance)机制。这篇文章的内容将与?一起阅读。相信大家对ApacheFlink的State和FaultTolerance会有更深入的了解,后面也会介绍window。,撤回将非常有帮助。#关于点赞和评论本系列文章难免有很多瑕疵和不足。真诚希望读者对有收获的章节给予表扬和鼓励,对不足的章节给予反馈和建议。先感谢您!作者:孙金城,花名金珠,目前就职于阿里巴巴,自2015年开始投入设计开发基于ApacheFlink的阿里巴巴计算平台Blink。【本文为专栏作家“金珠”原创稿件,转载请联系原作者】点此阅读该作者更多好文