本文转载请联系五分钟学大数据公众号。Flink中有三个位置需要端到端精准一次性处理:消息的时间消耗。Flink内部端:这个我们已经理解了。Checkpoint机制用于将状态保存到磁盘,发生故障时可以恢复,保证内部状态的一致性。Flink可靠性的基石——checkpoint机制sink端详解:将处理后的数据发送到下一阶段时,需要保证数据能够准确发送到下一阶段。在Flink1.4版本之前,精准的一次性处理仅限于Flink应用,即所有的Operator完全由Flink状态保存和管理,以实现精准的一次性处理。但是Flink处理完数据后,大部分的结果都需要发送到外部系统,比如SinktoKafka。在这个过程中,Flink不保证准确的一次性处理。在Flink1.4版本中,官方引入了一个里程碑函数:两阶段提交接收器,即TwoPhaseCommitSinkFunction函数。SinkFunction提取并封装了两阶段提交协议中的公共逻辑。此后,Flink通过特定的Source和Sink(比如Kafka0.11版本)来实现Exactly-OnceProcessingSemantics(英文缩写:EOS,Exactly-OnceSemantics)。Flink的端到端ExactOnceProcessingSemantics(EOS)注:以下内容适用于Flink1.4及之后的版本。对于Source端:Source端的ExactOnceProcessing比较简单。毕竟数据是落到Flink里面的,所以Flink只需要保存消费数据的offset就可以了。比如在Kafka消费数据,Flink以KafkaConsumer为源,可以保存offset。如果后续任务失败,连接器可以重置偏移量并再次使用它。数据以确保一致性。对于sink端:sink端是最复杂的,因为数据是落地到其他系统上的。一旦数据离开了Flink,Flink就无法监控数据,所以Flink也必须应用精确的一次处理语义来写入数据,因此这些外部系统必须提供一种手段来允许这些写入操作被提交或回滚,并且同时确保它们可以与FlinkCheckpoint协同使用(Kafka0.11版本已经实现了exactlyoneprocessing语义)。我们以Flink和Kafka的结合为例。Flink从Kafka读取数据,处理后的数据写入Kafka。以Kafka为例的第一个原因是,目前的Flink系统大多是通过Kafka系统读写数据。第二个也是最重要的原因是Kafka0.11版本正式发布了对事务的支持,这是Flink应用与Kafka交互实现端到端preciseonce语义的必要条件。当然,Flink对这种精确的一次性处理语义的支持并不局限于与Kafka的结合,任何Source/Sink都可以使用,只要它们提供了必要的协调机制。Flink与Kafka的结合Flink应用示例如上图所示。Flink包含以下组件:一个Source,从Kafka读取数据(KafkaConsumer),一个时间窗收集操作(Window),一个Sink,将结果写入Kafka(即KafkaProducer)如果Sink支持exactly-once处理语义(EOS),它必须以事务的方式向Kafka写入数据,这样当事务提交时,两个Checkpoints之间的所有写操作都作为一个事务提交。这确保在发生故障或崩溃时可以回滚这些写入。当然,在一个有多个并发执行的Sinks的分布式应用中,仅仅执行一次提交或回滚是不够的,因为所有的组件都必须对这些提交或回滚达成共识,才能保证结果的一致性。Flink使用两阶段提交协议和预提交阶段来解决这个问题。两阶段提交协议(2PC)两阶段提交协议(Two-PhaseCommit,2PC)是一种非常常用的解决分布式事务问题的方法。它可以保证在分布式事务中,要么所有参与的进程都提交事务,要么全部Cancellation,实现ACID中的A(原子性)。在数据一致性的上下文中,是指所有备份数据要么同时更改为某个值,要么不更改,从而实现数据强一致性。两阶段提交协议中有两个重要的角色,Coordinator和Participant。Coordinator只有一个,起到分布式事务协调和管理的作用,参与者有多个。顾名思义,两阶段提交将提交过程分为两个连续的阶段:投票阶段(Voting)和提交阶段(Commit)。两阶段提交协议流程如下图所示:两阶段提交协议第一阶段:投票阶段的协调者向所有参与者发送VOTE_REQUEST消息。当参与者收到VOTE_REQUEST消息时,它向协调器发送VOTE_COMMIT消息作为响应,告诉协调器它已准备好提交。如果参与者没有准备好或遇到其他故障,它会返回一个VOTE_ABORT消息来告诉协调器它当前无法提交事务。阶段2:提交阶段协调器收集来自各个参与者的投票消息。如果所有参与者都同意交易可以提交,则协调者决定交易的最终提交。在这种情况下,协调器向所有参与者发送GLOBAL_COMMIT消息,通知参与者在本地提交;如果任何参与者返回VOTE_ABORT消息,协调器将取消事务并向所有参与者广播GLOBAL_ABORT消息以通知所有参与者取消事务。每个提交了投票信息的参与者等待协调者返回消息。如果参与者收到GLOBAL_COMMIT消息,则参与者提交本地事务,否则,如果参与者收到GLOBAL_ABORT消息,则参与者取消本地事务。两阶段提交协议在Flink中的应用Flink的两阶段提交思想:我们分析了Flink从Flink程序启动到消费Kafka数据,直到Flink下沉数据到Kafka的精确一次性处理。Checkpoint启动时,JobManager会将checkpointbattier注入到数据流中,并在operator之间传递checkpointbarrier,如下图:Flink进行一次精确处理:Checkpoint启动Source端:FlinkKafkaSource负责保存Kafka消费偏移量。当Chckpoint成功时,Flink负责提交这些写入,否则终止并取消它们。当Chckpoint完成位移保存后,会把checkpointbarrier(checkpoint分界线)传递给下一个Operator,然后每个operator都会对当前状态进行一次快照,并保存到StateBackend。对于Source任务,当前偏移量将被保存为一个状态。下次从Checkpoint恢复时,Sourcetask可以重新提交offset,从上次保存的位置开始重新消费数据,如下图所示:side:从Source端开始,当每个内部transformtask遇到checkpointbarrier(checkpointboundary)时,都会将状态保存在Checkpoint中。当数据处理完成并发送到Sink时,Sink任务首先将数据写入外部Kafka。这些数据属于预提交事务(尚未消费)。同时,后端也必须预提交其对外的事务,如下图所示:snapshots被认为是Checkpoint的一部分),即当本次Checkpoint完成后,JobManager会向所有任务发送通知,确认Checkpoint完成。此时,Pre-commit阶段被认为已经完成。它正式是双阶段提交协议的第二阶段:提交阶段。在这个阶段,JobManager会为应用中的每个Operator发起Checkpoint已经完成的回调逻辑。本例中的DataSource和window操作没有外部状态,所以现阶段两个Opeartor不需要执行任何逻辑,但是DataSink有外部状态。这时候我们就必须要提交一个外部交易。当Sink任务收到确认Notification后,之前的事务会被正式提交,Kafka中未确认的数据会变成“已确认”,数据才能真正被消费,如下图:Flink精准处理一次:数据被精准消费注:Flink处理由JobManager协调各个TaskManager存储Checkpoints。检查点存储在StateBackend(状态后端)中。StateBackend默认是内存级的,也可以改成文件级的持久化存储。最后一张图总结一下Flink的EOS:Flink的端到端精准处理
