当前位置: 首页 > 后端技术 > Java

flink+kafka的端到端一致性

时间:2023-04-01 23:05:47 Java

上一篇文章中提到了flink+kafka是如何保证任务级顺序的,端到端一致性就是实现任务之间的精准一致性用户数据的目标端和源端。当端数据发生变化时,保证目标端及时、正确、持久地写入变化的数据。为了实现端到端的一致性,应该在顺序保证的基础上实现一致性语义exactlyonce的保证。从底层组件看:Debezium、Kafka、Flink构成了端到端一致性的各个关键环节,需要充分考虑和分析各个组件对一致性语义特性的支持。为了实现exactlyonce语义的一致性,需要提供处理过程的容错性和处理结果的幂等性。处理过程容错是指在计算过程中,当数据没有插入到目标端时,出现网络异常,程序重启,导致数据丢失。为此,源端必须能够重新设置数据的读取位置,同时配合Flink内部的checkpoint机制来解决这个问题。Kafka可以设置读取偏移量。每做一个checkpoint,Kafka当前消费的offset和计算结果都会写入statebackend。当任务异常恢复时,只需要从最近一次成功的checkpoint获取offset和计算结果,然后从这里开始消费和计算。幂等性是指要求当sink从故障中恢复时,数据不会因为重置读取位置和重新计算而重复写入外部系统。Flink针对不同的数据源提供了不同的一致性保证:AT-MOST-ONCE(至多一次):当任务失败时,最简单的方式是什么都不做,既不恢复丢失的状态,也不重放丢失的数据。最多一次语义意味着事件最多被处理一次。AT-LEAST-ONCE(至少一次):在大多数真实的应用场景中,我们希望不会丢失任何事件。这种保证被称为至少一次,意思是所有的事件都被处理,有些事件可能被处理多次。EXACTLY-ONCE(恰好一次):处理exactlyonce是最严格的保证,也是最难实现的。Exactly-once语义不仅意味着没有事件丢失,而且内部状态对于每条数据只更新一次。综上所述,sink端的幂等性是语义一致性实现的重要难点。幂等性:同一个操作不管重复多少次,效果都和只做一次操作一样;比如更新一个key/value,无论更新多少次,只要key和value不变,效果都是一样的;那么比如更新计数器来处理一条消息,会使计数器加1。这种操作本身不是幂等的,同一条消息会被中间件重新“发送+接收”两次,从而导致计数器数了两次;而如果我们的消息有id,那么update计数器的逻辑就修改为记录所有处理过的消息的id,收到消息后先查重,然后更新计数器,然后就是这个“更新计数器的逻辑”"成为一个幂等操作。将非幂等操作转换为幂等操作是端到端一致性的关键之一。确定性计算有点类似于幂等,但它是针对一次计算的;相同的输入必然得到相同的输出,这是一个确定性的计算;比如从一个msg中计算出一个key和一个value,如果同一条消息无数次计算得到的key和value是相同的,那么这个计算是确定性的,如果在key上加上当前时钟的字符串表示,那么这个计算是不确定的,因为如果你重新开始计算这个msg一次,得到一个完全不同的key。注1:非确定性计算一般会导致非幂等操作。比如我们要将上面例子中的key/value存储到数据库中,同一个msg重复处理多少次,我们就会重复插入多少条数据(因为key中的时间戳字符串不同。注2:非确定性计算并不一定会导致非幂等运算,比如timestamp不是加在key上而是加在value上,而且key总是一样的,那么这个计算还是“非幂等的”确定性”计算;但是当我们存储数据时,我们在存储键/值之前先检查重复项,所以无论我们重复处理同一个消息多少次,我们只会成功存储第一个键/值,随后的键/value会被过滤掉,该缺陷只适用于目标表有主键的情况,适用于HBase、Redis、Cassandra等KV数据库;也可以通过设置消息的SequenceId来进行去重,无法处理非确定性计算。需要注意的是,也可能存在中间状态暂时不一致而最终结果一致的情况。预写日志记录(简称WAL)是一种关系数据库系统,用于提供原子性和持久性(ACID属性中的两个)。在使用WAL的系统中,所有的修改都必须写入日志文件才能生效。日志文件通常包括重做和撤消信息。这样做的目的可以用一个例子来说明。假设一个程序正在执行一些操作,机器断电了。重新启动时,程序可能需要知道此时执行的操作是成功、部分成功还是失败。如果使用WAL,程序可以检查日志文件,将突然断电时计划执行的操作内容与实际执行的操作内容进行比较。在这个比较的基础上,程序可以决定是撤销已经做过的操作,继续完成已经做过的操作,还是保持原样。缺陷微批处理,存在一定的延迟。不能保证所有批次的数据都会成功,如果批量写入时没有事务隔离,恢复时出现故障后会出现重复写入。读取和写入可以并发执行而不会互相阻塞(但写入仍然不是并发的)。两阶段提交对于关系型数据库,可以通过开启事务来避免这个问题,但是对于分布式处理系统,如何开启分布式事务,或者目标本身是否支持(分布式)事务就成为关键。一般意义上的两阶段提交两阶段提交(Two-phaseCommit,以下简称2PC)是指计算机网络和数据库领域,为了让所有基于分布式系统架构的节点在提交事务时保持一致性。的一种算法。通常,2PC也被称为协议(Protocol)。在分布式系统中,虽然每个节点都可以知道自己操作的成功或失败,但无法知道其他节点操作的成功或失败。当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个组件作为协调器,统一控制所有节点(称为参与者)的操作结果,并最终指示这些节点是否进行操作结果真实提交(如将更新数据写入磁盘等)。因此,2PC的算法思想可以概括为:参与者通知协调器操作成功或失败,然后协调器根据反馈的信息决定每个参与者是提交操作还是中止操作所有参与者。要求外部sink系统必须提供事务支持,或者sink任务必须能够在外部系统上模拟事务。Flink中的两阶段提交Flink将2PC的逻辑放在了checkpoint流程中,并提供了实现模板类TwoPhaseCommitsinkFunction。Flink的JobManager对应2PC协调器,Operator实例对应2PC参与者。继承TwoPhaseCommitsinkFunction需要三个类型参数:IN用于指定输入数据的类型;TXN定义了用于交易识别和失败后恢复的交易标识符类型;CONTEXT用于指定可选的自定义上下文对象的类型。继承自TwoPhaseCommitsinkFunction的子类构造函数需要传入两个TypeSerializer,一个是TXN类型,一个是CONTEXT类型。TwoPhaseCommitsinkFunction中定义了五个抽象方法:beginTransaction()用于启动一个事务,可以从连接池Connect中获取并返回事务句柄;每进来一条数据都会触发invoke(),当当前数据为schema时,可以直接执行对应的查询语句。当当前数据为数据时,按照元数据中的变化类型r(全量)、c(增量插入)、u(增量更新)、d(增量删除)、before(变化前的数据)、after(数据后的数据)change),db(库名),table(表名)等信息重组相应的SQL语句,将数据写入到开启的事务中;preCommit()预提交事务将不再接收数据写入;commit()提交指定的事务;abort()用于终止并返回Roll指定的事务。开发者可以通过实现上述抽象方法来自定义相应的功能。protectedabstractTXNbeginTransaction()throwsException;protectedabstractvoidinvoke(TXNtransaction,INvalue,Contextcontext)throwsException;protectedabstractvoidpreCommit(TXNtransaction)抛出异常;protectedabstractvoidprofortcommit(TXNtransactionidab;(TXNtransaction);上面TwoPhaseCommitsinkFunction中的抽象方法和两阶段提交的执行过程如下:当前面的checkpoint完成后,会开启一个新的事务beginTransaction,当每条数据进入时都会触发一次invokethistransactionarrives;当前检查点到达,preCommit会在这个transaction上执行,如果invoke和preCommit都成功,说明第一阶段成功,如果第一阶段有机器故障,orinvoke,preCommit失败,会触发abort方法,第一阶段结束时,数据写入外部存储,如果外部存储的事务隔离级别为已提交读(ReadCommitted),则我们写入的数据无法读取,因为未执行提交操作。当所有Operator实例完成checkpoint并执行preCommit后,它们会将快照完成消息发送给JobManager。JobManager收到后认为本次checkpoint全部完成,通知所有Operator实例执行commit方法正式提交。外部存储可以读取我们提交的数据。关注gzh“HEYDATA”一起讨论更多,私信“TwoPhaseCommit”获取实现案例demo文件