当前位置: 首页 > 后端技术 > Java

基于EMROLAP的开源实时数仓解决方案ClickHouse事务实现_0

时间:2023-04-01 20:21:43 Java

简介:阿里云EMROLAP与Flink团队深度合作,支持Flink到ClickHouse的Exactly-Once写入,保证数据的准确性整个实时数据仓库数据。本文介绍基于EMROLAP的开源实时数仓解决方案。作者简介阿里云EMR-OLAP团队;主要负责开源大数据OLAP引擎的开发,如ClickHouse、Starrocks、Trino等。通过EMR产品为阿里云用户提供一站式大数据OLAP解决方案。内容框架背景机制结合技术方案测试结果未来规划1.背景Flink和ClickHouse分别是实时流计算和OLAP领域的领导者。很多互联网、广告、游戏等客户将二者结合起来构建用户画像和实时BIReports、应用监控指标查询、监控等服务,形成实时数仓解决方案(图-1)。这些业务对数据的准确性要求非常严格,所以实时数仓的整个链路都需要保证端到端的Exactly-Once。一般来说,Flink的上游是一个拉取式的持久化存储(比如Kafka),可以重复读取或消费。Source端要实现Exactly-Once,只需要回溯Source端的读取进度即可。sink端的Exactly-Once比较复杂,因为sink是push-based的,需要依赖目标输出系统的事务保证,但是社区ClickHouse不支持事务。因此,针对这种情况,阿里云EMRClickHouse和Flink团队进行了深入的研发,支持Flink到ClickHouse的Exactly-Once写入,保证了整个实时数仓数据的准确性。本文将分别介绍现有的机制和实现方案。图-1实时数据仓库架构二结合ClickHouse写入机制ClickHouse是一个MPP架构的列式OLAP系统(如图-2),每个节点都是平等的,通过Zookeeper协同数据,可以并发写入每个节点以本地表的形式导入大批量数据。ClickHouse的数据部分是数据存储的最小单位。ClickHouse接收到的数据块在写入时,会按照分区粒度进行拆分,形成一个或多个数据块。数据部分写入磁盘后,会通过后台合并线程不断合并,将小数据部分合并成大数据部分,减少存储和读取开销。向本地表写入数据时,ClickHouse会先写入一个临时数据部分。这个临时数据部分的数据对于客户端是不可见的,那么它会直接进行重命名操作,使这个临时数据部分成为正式数据部分。当数据对客户端可见时。几乎所有的临时数据部分都会被快速、成功地重命名为正式数据部分,而没有成功重命名的临时数据部分最终会被ClickHouse清理策略从磁盘中删除。通过上面的分析可以看出,ClickHouse的数据写入有一个从临时数据部分到正式数据部分的机制,可以修改为符合两阶段提交协议,这是实现事务提交的重要协议分布式系统的一致性。图2Flinkjob写入ClickHouse注:多个FlinkTask可以写入同一个shard或replicaFlink写入机制作为分布式处理引擎,Flink提供了基于事务的Sink机制,可以保证exactly-once写入,相应的数据接收方需要提供符合XA规范的JDBC。由于完整的XA规范比较复杂,我们先梳理一下Flink的处理机制,结合ClickHouse的实际情况,确定需要实现的接口范围。为了在分布式写入时实现事务的统一提交,Flink使用了Checkpoint机制。该机制可以周期性地生成每个Operator中状态的快照并持久化存储。在checkpoint机制中,有一个Coordinator角色来协调所有Operator的动作。从Operator的角度来看,一个Checkpoint分为三个阶段,初始化-->快照生成-->Checkpoint的完成/放弃。从Coordinator的角度来说,需要定时触发checkpoint,在所有Operator完成快照后触发complete通知。(参考附录1)接下来介绍Flink中的Operator是如何利用事务和检查点机制来保证Exactly-Once的。Operator的完整执行需要经历initial、writeData、snapshot、commit、close这几个阶段。初始阶段:从快照中取出上次任务执行的持久化xid记录。snapshot中主要存储了两种xid,一种是未完成快照阶段的xid,一种是已完成快照的xid。接下来对上次没有完成快照的xids进行回滚操作;对已经完成快照但上次commit失败的xids进行commitretry操作。如果上述操作失败,则任务初始化失败,任务终止,进入关闭阶段;如果上述操作成功,则继续。创建一个新的唯一xid作为事务ID并将其记录在快照中。使用新生成的xid,调用JDBC提供的start()接口。writeData阶段:交易开启后,进入写数据阶段,Operator大部分时间都在这个阶段度过。在与ClickHouse的交互中,这个阶段是调用JDBC提供的preparedStatement的addBatch()和executeBatch()接口,每次写入数据时都会在消息中携带当前的xid。在数据写入阶段,首先将数据写入Operator内存,将内存中的批量数据提交给ClickHouse有3种触发方式:内存中的数据项数量达到batchsize的阈值;后台定时线程定时触发自动刷新;在snapshot阶段调用end()和prepare()接口之前,会调用flush清除缓存。快照阶段:当前事务会调用end()和prepare()接口,等待commit,更新快照中的状态。接下来会启动一个新的事务作为这个Task的下一个xid,新的事务会被记录在snapshot中,调用JDBC提供的start()接口来启动新的事务。永久存储快照。完成阶段:所有Operator的快照阶段正常完成后,Coordinator会通知所有Operator对成功??的checkpoints进行完成操作。在与ClickHouse的交互中,在这个阶段,Operator调用JDBC提供的commit()接口来完成事务。提交。关闭阶段:如果当前事务还没有到达快照阶段,就会对当前事务进行回滚操作。关闭所有资源。从上面的过程可以看出,Flink通过checkpoint和transaction机制,将上游数据按照checkpoint周期进行分批,保证每批数据写入完成后,Coordinator通知所有Operator完成commit操作一起。当一个Operator写入失败时,会回到上次成功的checkpoint的状态,根据快照中记录的xid,对这批checkpoint的所有xid进行回滚操作。当commit操作失败时,会重试commit操作,将失败交给人工干预。3、整体技术方案根据Flink和ClickHouse的写入机制,可以画出Flink向ClickHouse写入事务的时序图(如图3所示)。由于写的是ClickHouse的本地表,事务的统一提交由Coordinator来保证,所以ClickHouse不需要实现XA规范中标准的分布式事务,只需要实现两个中的几个关键接口——phasecommit协议,其他接口在JDBC端实现。默认值很好。图-3Flink写入ClickHouse事务的时序图ClickHouse-Server状态机为了实现ClickHouse的事务,我们首先定义事务允许执行的几个操作:Begin:开始一个事务。写入数据:在事务中写入数据。提交:提交事务。回滚:回滚未提交的事务。交易状态:未知:交易未开启,此时进行任何操作均属非法。Initialized:交易已经开启,此时允许所有操作。Committing:正在提交事务,不再允许Begin/WriteData操作。Committed:事务已经提交,不允许任何操作。Aborting:事务正在回滚,不允许任何操作。Aborted:事务已经回滚,不允许任何操作。完整的状态机如下图4所示:图4ClickHouseServer支持事务状态机图中所有操作都是幂等的。其中Committing转Committed和Aborting转Aborted不需要进行任何操作。当开始执行Commit或Rollback时,事务的状态会变为Committing或Aborting;执行Commit或Rollback后,事务的状态将被设置为Committed或Aborted。事务处理Client通过HTTPRestfulAPI访问ClickHouseServer,一个完整的事务Client和ClickHouseServer的交互过程如图-5所示:图-5ClickHouse事务处理时序图正常流程:Client向任意ClickHouse发送消息ClickHouse集群中的ServerBeginTransaction请求,携带Client生成的全局唯一的TransactionID。当ClickHouseServer收到BeginTransaction请求后,会向Zookeeper注册TransactionID(包括创建TransactionID和子Znode节点),并初始化事务状??态为Initialized。当客户端收到BeginTransaction成功响应后,就可以开始写数据了。当ClickHouseServer接收到Client发送过来的数据时,会生成临时数据部分,但不会将其转化为正式的数据部分,ClickHouseServer会将写入的临时数据部分信息以JSON的形式记录到Zookeeper的信息中交易。Client写完数据后,会向ClickHouseServer发送CommitTransaction请求。ClickHouseServer收到CommitTransaction请求后,根据ZooKeeper上对应的Transaction数据部分信息,将ClickHouseServer本地的临时数据部分数据转换为正式数据部分数据,并更新Transaction状态为Committed。Rollback的过程与Commit类似。异常处理:如果在创建TransactionID的过程中发现Zookeeper中已经存在相同的TransactionID,则根据Zookeeper中记录的Transaction状态进行处理:如果状态为Unknown,则继续处理;如果状态为Initialized,直接返回;否则,将抛出异常。目前实现的事务不支持分布式事务,只支持单机事务,所以Client只能向记录了TransactionID的ClickHouseServer节点写入数据。如果ClickHouseServer收到的数据不是本节点的事务,ClickHouseServer会直接返回错误信息。与写数据不同,如果Client在Commit阶段向没有记录TransactionID的ClickHouseServer发送CommitTransaction请求,ClickHouseServer不会返回错误信息,而是返回记录该Transaction的ClickHouseServer的地址客户端的ID,允许客户端重定向到正确的ClickHouse服务器。Rollback的过程与Commit类似。根据ClickHouse-JDBC的XA规范,一个完整的分布式事务机制需要实现大量的标准接口(参考附录2)。在本设计中,只需要实现少量的关键接口。因此采用基于组合的适配器模式,为Flink提供基于标准XA接口的XAResource实现,同时屏蔽ClickHouseServer对不需要支持的接口。XADataSource的实现采用了基于继承的适配器模式,并根据Exactly-Once的特点修改了一些默认的配置,比如发送失败的重试次数等参数。另外,在生产环境中,写入数据时的负载均衡通常不是通过分布式表来完成的,而是通过SLB来完成的。在Exactly-Once场景下,Flink端的Task需要和某个ClickHouseServer节点保持连接,所以不能使用SLB做负载均衡。为了解决这个问题,我们借鉴了BalanceClickHouseDataSource的思想。通过在URL中配置多个IP,在properties配置中将write_mode设置为Random,我们可以让XADataSource在保证Exactly-Once的同时具备负载均衡能力。Flink-Connector-ClickHouseFlink作为一个流式数据处理引擎,支持写入多个数据接收者的能力,每个数据接收者都需要实现一个特定的Connector。针对Exactly-Once,ClickHouseConnector增加了XADataSource的选项配置,根据客户端的配置提供Exactly-Once功能。4.测试结果ClickHouse事务性能测试在ClickHouse中单批次写入与总批次相同的数据量,在客户端比较不同并发写入线程的性能。从图6可以看出,无论ClickHouse是否开启事务,ClickHouse的吞吐量都与Client并发写入的线程数成正比。事务启动时,ClickHouse中的临时数据部分不会立即转换为正式数据部分,因此在事务完成之前,大量临时数据部分不会参与到ClickHouse合并过程中,减少对磁盘的影响IOonwriteperformance,所以事务的写性能比较低。未开启事务的写性能更好;但是,事务中包含的batch数量增加,磁盘上临时数据部分的增加增加了合并时CPU的压力,影响了写入性能,打开事务的写入性能也下降了。会减少。图6ClickHouse写入性能压力测试(1)ClickHouse总写入批次与客户端并发写入线程相同,单批次写入ClickHouse不同数据量的性能对比。从图7可以看出,无论ClickHouse是否启动事务,ClickHouse的吞吐量都与单批数据的大小成正比。事务开启时,每批数据越小,ClickHouse吞吐量对事务开启与否的影响越大。这是因为每批的写入时间在事务处理中所占的比例很小,事务对此会有一定的影响。因此,事务包含的批次越多,越能降低事务对写入性能的影响;当事务中包含的batch越多,事务处理时间在写入中的占比逐渐下降,ClickHousemerge产生的影响越来越大,从而影响写入的性能。开启事务的写入性能优于不开启事务。图-7ClickHouse写性能压力测试(二)总体来说,开启事务对写性能几乎没有影响,符合我们的预期。Flink写入ClickHouse的性能对比对于相同的数据量,不同的Checkpoint周期,Flink写入ClickHouse的总耗时如图-8所示。可以看出,检查点周期对不开启Exactly-Once的任务耗时没有影响。对于开启Exactly-Once的任务,在5s到60s的范围内,耗时呈现先减后增的趋势。原因是当checkpoint周期较短时,启用Exactly-Once的Operator与Clickhouse的交互过于频繁;当Checkpoint周期较长时,启用Exactly-Once的Operator需要等待Checkpoint周期结束才能提交最后一笔交易。让数据可见。本次测试中,checkpoint周期数据仅作为参考。在生产环境中,需要根据机器规格和数据写入速度进行调整。一般来说,Flink在开启Exactly-Once特性的情况下写入ClickHouse时,性能会受到轻微影响。这一结论符合我们的预期。图8Flink写入ClickHouse测试五、未来规划EMRClickHouse这个版本实现的事务不是很完善。只支持单机事务,不支持分布式事务。分布式系统一般使用MetaServer进行元数据统一管理,支持分布式事务机制。我们目前正在计划设计ClickHouseMetaServer来支持分布式事务,同时去除ClickHouse对ZooKeeper的依赖。原文链接本文为阿里云原创内容,未经许可不得转载。