当前位置: 首页 > 科技观察

TiFlink:使用TiKV和Flink实现强一致的物化视图

时间:2023-03-14 22:41:55 科技观察

在今年早些时候的TiDBHackathon上,我和一群队友尝试使用Flink为TiDB添加物化视图功能,并获得了“最佳人气奖””。可以说物化视图是本次比赛的一大热点。结合Flink实现相关功能的团队有三四个。必须承认,比赛结束时我们的项目完成度很低。虽然基本思路已经敲定,但最终的结果与预期相去甚远。经过半年多的间歇性修复,今天终于可以放出预览版供大家试用。这篇文章是对我们的想法和结果的介绍。与其他团队相比,我们的主要目标是实现强大且一致的物化视图构建。即保证物化视图在查询时能够达到接近SnapshotIsolation的隔离级别,而不是一般流处理系统的最终一致性(EventualConsistency)。下面详细讨论实现一致性。Introduction虽然是一个实验性的项目,但我们还是探索了一些方便实用的特性,包括:TiFlinkApp关于TiFlink实用性的详细信息,请参考README。下面是快速启动一个任务的代码段:TiFlinkApp.newBuilder().setJdbcUrl("jdbc:mysql://root@localhost:4000/test")//Pleasemakesuretheuserhascorrectpermission.setQuery("selectid,"+"first_name,"+"last_name,"+"email,"+"(selectcount(*)frompostswhereauthor_id=authors.id)asposts"+"fromauthors")//.setColumnNames("a","b","c","d")//Overridecolumnnamesinferredfromthequery//.setPrimaryKeys("a")//指定主键列,默认为第一列//.setDefaultDatabase("test")//默认使用TiDB数据库,默认为JDBCURL指定的。setTargetTable("author_posts")//TiFlink会自动创建表如果不存在//.setTargetTable("test","author_posts")//Itspossibletosespecifythefulltablepath.setParallelism(3)//FlinkJob的Parallelism.setCheckpointInterval(1000)//Checkpoint间隔毫秒数.Thisintervaldeterminesdatarefreshrate.setDropOldTable(true)//IfTiFlinkshoulddropoldtargettableonstart.setForceNewTable(true)//Iftotthrowanerroriftetargettablealreadyexists.build().start();//启动app物化视图(流处理系统)一致性目前主流的物化视图(流处理)系统是主要是使用最终一致性意味着虽然最终的结果会收敛到一个一致的状态,但是终端用户在处理过程中仍然可能会查询到一些不一致的结果。最终一致性在很多应用中被证明是足够的,那么是否真的需要更强的一致性呢?这里的一致性和Flink的ExactOnce语义有什么关系呢?一些介绍是必要的。ACIDACID是数据库的一个基本概念。一般来说,作为CDC日志来源的数据库已经保证了这四个要求。但是,当使用CDC数据进行流式处理时,可能会违反其中一些约束。最典型的情况就是Atomic特性的丢失。这是因为在CDC日志中,一个事务的修改可能覆盖多个记录,如果流处理系统以行为单位进行处理,可能会破坏原子性。也就是说,查询结果集的用户看到的是不完整的事务。一个典型的案例如下:ChangeLogandTransactionAtomicity在上面的案例中,我们有一个账户表,账户表之间会有转账操作。由于传输操作涉及修改多行,因此往往会产生多条记录。假设我们有一个由以下SQL定义的物化视图来计算所有账户余额的总和:SELECTSUM(balance)FROMACCOUNTS;显然,如果表中只有账户之间的转账,那么这个查询返回的结果应该总是某个常量。但是由于目前通用的流处理系统无法处理事务的原子性,所以这个查询的结果可能会不断波动。事实上,在一个不断并发修改的源表上,它的波动甚至可能是无界的。虽然在最终一致性模型下,上述查询的结果会在一段时间后收敛到正确的值,但是没有原子性保证的物化视图仍然限制了应用场景:假设我想在上述查询结果出现时执行一个查询偏离太多如果我没有不时发出警报的工具,我就有可能收到很多误报。也就是说此时数据库端没有异常,数值的偏差只是来自于流处理系统内部。在分布式系统中,还有一种情况会违反原子性,即当一个事务修改的副作用分布在多个不同的节点上时。如果此时不使用2PC等方式进行分布式提交,原子性也会被打破:部分节点(分区)上的修改先于其他节点生效,导致不一致。线性一致性不同于单机数据库(如MySQL的Binlog)生成的CDC日志,TiDB等分布式数据库生成的日志会存在线性一致性问题。在我们的场景中,线性一致性问题可以描述为:从用户的角度来看,一些操作是一个接一个地执行的,它们产生的副作用(日志)由于延迟而被流处理系统以不同的顺序处理消息系统传递。假设我们有两张表,订单表(ORDERS)和支付信息表(PAYMENTS),用户必须创建订单才能支付,所以下面查询的结果一定是正数:WITHOrder_amountAS(SELECTSUM(amount)AStotalFROMORDERS),WITHpayment_amountAS(SELECTSUM(amount)AStotalFROMPAYMENTS)SELECTorder_amount.total-payment_amount.totalFROMorder_amount,payment_amount;但是由于ORDERS表和PAYMENTS表存储在不同的节点上,流处理系统的消费速度可能不一致。也就是说,流处理系统可能已经看到了支付信息的记录,但是对应的订单信息还没有到。因此,可以观察到上述查询的否定结果。在流处理系统中,有Watermark的概念,可以用来同步不同表中数据的处理进度,但无法避免上述线性一致性问题。这是因为Watermark只要求所有时间戳小于它的记录都到达了,而不要求时间戳大于它的记录都没有到达。也就是说,即使ORDERS表和PAYMENTS表现在有相同的Watermark,后者可能还有一些更早的记录已经生效。可见,单纯依靠Watermark本身是无法处理线性一致性问题的,必须配合源库的时间生成系统和消息系统。更强的一致性要求虽然最终一致性在很多场景下已经足够了,但是仍然存在很多问题:还没有收敛。当大多数关系数据库默认为强一致性时,应该避免这种情况。可观察性差:由于最终一致性并不能保证收敛时间,并且考虑到线性一致性的存在定义处理系统的延迟、数据新鲜度、吞吐量等指标。比如用户看到的JO??IN结果可能是A表的当前快照和B表十分钟前的快照join的结果。这个时候查询结果的延迟应该怎么定义呢?限制某些需求的实现:如前所述,由于内部状态不一致,某些告警需求要么无法实现,要么需要延迟一段时间。否则,用户将不得不接受高误报率。事实上,缺乏更强的一致性也导致了一些运维操作,尤其是DDL操作,使用了之前计算的结果。参考关系数据库和NoSQL数据库的发展历史,我们认为目前主流的最终一致性只是受限于技术发展的权宜之计。随着相关理论和技术研究的进步,更强的一致性将逐渐成为流处理系统的主流。技术方案介绍这里详细介绍TiFlink的技术方案注意事项以及如何实现强一致性物化视图(StreamSQL)维护。虽然TiKV和Flink都是TiDBHackthon项目,必须选择TiDB/TiKV相关的组件,但是在我看来,TiKV作为物化视图系统的中间存储方案有很多突出的优势:TiKV是一个比较成熟的分布式KV存储,以及分布式环境是下一代物化视图系统必须支持的场景。使用TiKV自带的JavaClient,我们可以很方便的操作它。同时,TiDB本身作为一个HTAP系统,只是为物化视图的需求提供了一个playground。TiKV提供了事务支持和基于Percolator模型的MVCC,这是TiFlink实现强一致性流处理的基础。如下图可以看出,TiFlink主要是以连续事务的形式写入TiKV。TiKV原生支持CDC日志输出。实际上,TiCDC组件就是利用这个特性来实现CDC日志导出功能的。在TiFlink中,为了实现批流集成,简化系统流程,我们选择了直接调用TiKV的CDCGRPC接口,所以我们也放弃了TiCDC提供的一些特性。我们最初的想法是直接将计算功能集成到TiKV中,而选择Flink是在比赛中进一步思考后得出的结论。选择Flink的主要优势在于:Flink是目前市场上最成熟的有状态流处理系统。具有很强的处理任务的表达能力,支持丰富的语义。我们更关注强一致性和Flink相对完善的Watermark等功能,我们发现其基于Checkpoint的ExactlyOnceDelivery语义可以很容易地与TiKV结合实现事务处理。事实上,Flink本身提供的一些支持TwoPhaseCommit的Sinks是结合Checkpoint提交的。Flink的流处理(尤其是StreamSQL)本身就是基于物化视图的理论。较新版本中提供的DynamicTable接口是为了方便将外部ChangeLog引入系统。它已经提供了对各种CDC操作的支持,例如INSERT、DELETE和UPDATE。当然,选择TiKV+Flink这样的异构架构也会引入一些问题,比如SQL语法不匹配、UDF无法共享等。在TiFlink中,我们以Flink的SQL系统和UDF为标准,将其作为TiKV的插件系统使用,但同时提供了方便的建表功能。强一致性物化视图的实现思路这部分将介绍TiFlink如何在TiDB/TiKV的基础上实现一个比较强的一致性级别:StaleSnapshotIsolation。在这种隔离级别下,查询者总是查询历史上一致的快照状态。在传统的快照隔离中,要求查询器能够并且只观察到所有在时间$T$Commit时间小于$T$的事务。而延迟快照隔离只能保证观察到所有在$T-\Deltat$之前提交的事务。在TiDB等支持事务的分布式数据库上实现强一致物化视图,最简单的思路就是使用一个又一个事务来更新视图。事务一开始读取的是一致性快照,使用分布式事务更新物化视图本身就是强一致性操作,具有ACID特性,所以一致性是可以保证的。为了将Flink与这样的机制结合起来,实现增量维护,我们利用了TiKV本身已经提供的一些特性:日志中事务的时间戳实际上是有序的。TiKV节点(Regions)可以产生连续的增量日志(ChangeLogs)。流量日志会周期性的生成ResolvedTimestamp,声明当前Region将不再生成时间戳较旧的消息。因此,它非常适合Watermark。TiKV提供了分布式事务,让我们可以控制一批修改的可见性。因此,TiFlink的基本实现思路是:利用流批一体的特性,读取带有全局时间戳的源表的快照,此时可以获得所有源表的一致视图,并切换到增量日志消耗。使用Flink的DynamicTable相关接口,可以按照一定的节奏对物化视图进行增量维护和输出修改,做到所有的修改都是原子性的。以事务方式编写目标表,为物化视图提供一个接一个的更新视图。以上几点的关键是协调各个节点共同完成分布式事务,所以有必要介绍一下TiKV的分布式事务执行原理。TiKV的分布式事务TiKV的分布式事务基于著名的Percolator模型。Percolator模型本身就需要存储层的KVStore有MVCC的支持和单行读写的原子性和乐观锁(OCC)。在此基础上,它使用以下步骤完成一个事务:指定一个事务主键(PrimaryKey)和一个开始时间戳,并写入主键。其他行在Prewrite时以二级键(SecondaryKey)的形式写入,二级键会指向主键有上面的开始时间戳。所有节点Prewrite完成后,即可提交事务。这时候应该先commit主键,并给出一个Commit时间戳。主键Commit成功后,事务实际上已经提交成功,但是此时为了读取方便,多个节点可以同时Commit副键并进行清理工作,然后所有写入的行都会变得可见。key是否commit成功完全取决于主键,所以其他读者在读到PrewritebutnotyetyetCommitted这一行时,会检查主键是否已经Committed。读者也会根据Commit时间戳来判断一行数据是否可见。如果中途Cleanup操作失败,后续读者也可以执行。为了实现快照隔离,Percolator要求写入者在写入时检查并发的Prewrite记录,以确保其时间戳在提交事务之前满足一定的要求。本质上要求不能同时提交具有重叠写集的事务。在我们的场景中,假设物化视图只有一个写入者并且事务是连续的,因此无需担心这一点。了解了TiKV的分布式事务原理后,首先要考虑的是如何与Flink结合。在TiFlink中,我们使用Checkpoint机制来实现全局一致的事务提交。使用Flink进行分布式事务提交从上面的介绍可以看出,TiKV的分布式事务提交可以抽象为2PC。Flink本身提供了实现2PC的sink,但是在我们的场景中是不能直接使用的。原因是Percolator模型在提交时需要有一个全局一致的事务开始时间戳和提交时间戳。而仅仅在sink端实现2PC还不足以实现强一致的隔离级别:我们还需要在source端进行配合,让每个事务只读取需要的增量日志。幸运的是,Flink的2PC提交机制实际上是由Checkpoint驱动的:当Sink收到Checkpoint请求时,它会完成提交的必要任务。受此启发,我们可以实现一对Source和Sink,让它们使用Checkpoint的ID共享Transaction信息,配合Checkpoint的进程完成2PC。为了使不同节点能够就交易信息(时间戳、主键)等达成一致,需要引入全局协调器。transaction和globalcoordinator的接口定义如下:publicinterfaceCoordinatorextends,AutoCloseSerializable{TransactionopenTransaction(longcheckpointId);TransactionprewriteTransaction(longcheckpointId,longtableId);TransactioncommitTransaction(longcheckpointId);TransactionabortTransaction(longcheckpointId);}使用上面的接口,每个Source和Sink节点都可以使用CheckpointID开启一个事务或者获取一个事务ID,协调者将负责分配主键并维护事务的状态。为了方便起见,在事务Commit时,对主键的commit操作也是在coordinator中进行的。有许多方法可以实现协调器。目前TiFlink采用最简单的实现方式:在JobManager所在进程中启动一个GRPC服务。也可以基于TiKV的PD(ETCD)或者TiKV自身实现分布式协调器。事务和Checkpoints的协调执行上图展示了Flink中执行分布式事务和Checkpoints的协调关系。一个事务的具体流程是这样的:Source首先从TiKV接收增量日志,根据时间戳缓存起来,等待事务开始。当Checkpoint进程启动时,Source会先接收到信号。Source端的Checkpoint和日志接收服务运行在不同的线程中。Checkpoint线程首先通过全局协调器获取当前事务的信息(或者启动一个新的事务)。在分布式的情况下,一个CheckpointID对应的事务只会被打开一次来获取事务。在起始时间戳之后,Source节点开始将Cache中提交的小于该时间戳的修改发出给下游计算节点消费。这时Source节点也会发出一些Watermark。当所有Source节点都完成以上操作后,Checkpoint在Source节点上成功完成,并会继续向后传播。根据Flink的机制,Checkpoint会保证所有的事件在到达每个节点之前。全部消费完毕当Checkpoint到达Sink时,之前传播到Sink的Events已经被预写好,此时可以开始交易提交流程。Sink将事务信息持久化在内部状态中,方便发生错误时的恢复。所有Sink节点完成该操作后,会在回调中调用Coordinator的Commit方法提交事务。事务提交后,Sink会为SecondaryKey启动一个线程,同时启动一个新的事务,注意在第一个Checkpoint启动之前,Sink可能已经开始接收写入的数据,此时没有事务信息.为了解决这个问题,TiFlink会在任务开始时直接启动一个初始事务,其对应的CheckpointID为0,用于提交一些初始写入。在这种情况下,当CheckpointID=1的Checkpoint完成后,0事务才真正提交。交易和检查点就是以这种错位的方式协调和执行的。下图是包括协调器在内的整个TiFlink任务的架构:TiFlink的系统架构是基于以上的系统设计,我们得到了一个在TiKV上实现延迟快照隔离的物化视图。其他设计考虑众所周知,KSQL是Fl??ink之外的另一个流行的流处理系统。它直接与Kafka消息队列系统结合,用户不需要部署两套处理系统,因此受到部分用户的青睐。许多用户还使用KSQL来满足物化视图等需??求。但是,在我看来,这种与消息队列强耦合的流处理系统并不适合物化视图的使用场景。KSQL可以说是面向日志的数据处理系统的代表。在这个系统中,数据的来源在于日志信息,所有的表都是为了方便查询而消费日志信息构建的视图。该系统具有模型简单、易于实现、日志记录长期保存等优点。相比之下,面向表的数据处理系统、MySQL、TiDB/TiKV都属于此类系统。此类系统的所有修改操作都作用于表数据结构。期间虽然会产生日志,但是表数据结构的修改和日志往往是协同在一起的。这里的日志主要是为持久化和事务服务的,往往不会长期保存。与面向日志的数据处理系统相比,这类系统在写入和事务处理上稍微复杂一些,但扩展性要求更强。归根结底,这是因为LogOriented系统中的数据是以日志的形式存储的,所以在扩容时往往需要进行代价高昂的rehash,实现rebalancing的难度更大。在TableOriented系统中,数据主要以表的形式存储,因此可以在某些列中有序排列,在一致性Hash的支持下,便于Ranges的切分、合并和重平衡。个人认为,在集成批流的物化视图场景下,长期保存日志意义不大(因为数据总能从源表的快照中恢复)。相反,随着业务的发展,不断扩展数据处理任务和视图更为重要。从这个角度看,TableOriented系统似乎更适合作为物化视图需求的存储介质。当然,实时消费增量日志时发生的分区合并或分裂是一个比较难处理的问题。在这种情况下,TiKV将抛出GRPC错误。TiFlink目前使用比较简单的静态映射方式来处理任务和分区之间的关系,未来可以考虑更合理的方案。小结本文介绍了使用Flink在TiKV上实现强一致性物化视图的基本原理。以上原理在TiFlink系统中已经基本实现,欢迎读者试用。以上所有讨论都是基于Flink最终一致性模型的保证,即流计算的结果只与消费的Event及其在自己流中的顺序有关,与到达顺序无关在系统中和不同流之间的相对顺序。目前的TiFlink系统还有很多值得改进的地方,比如:支持非整数主键和联合主键更好的将TiKVRegion映射到Flink任务更好的容错和任务中断时清理TiKV事务完善的单元测试如果读者对TiFlink感兴趣,欢迎试用并反馈。如果你能贡献代码来帮助改进这个系统,那就太好了。对物化视图系统一致性的思考是我这一年的主要收获之一。其实一开始我们并没有太关注这方面,而是通过不断的沟通才意识到这是一个有价值且具有挑战性的问题。通过TiFlink的实现,可以说基本验证了上述方法实现延迟快照一致性的可行性。当然,由于个人能力水平有限,如有错误,欢迎大家讨论。最后,如果我们假设上面关于延迟快照一致性的讨论是正确的,那么实现真正的快照隔离的方法指日可待。不知各位读者能否想到?