当前位置: 首页 > 科技观察

FlinkCDC在大建云仓的实践

时间:2023-03-13 19:37:30 科技观察

摘要:本文整理自大建云仓基础设施负责人、FlinkCDCMaintainer龚中强在5月21日FlinkCDCMeetup上的演讲。主要内容包括:引入FlinkCDC的背景、当前内部业务场景、未来内部推广和平台建设、社区合作1、引入FlinkCDC的背景,公司引入CDC技术主要基于以下四个角色的需求:物流科学家:需要库存、销售订单、物流账单等数据进行分析。开发:需要同步其他业务系统的基础信息。财务:希望财务数据能够实时传输到财务系统,而不是在月末之前就被看到。老板:需要一个数据大屏,通过大屏查看公司的业务和运营情况。CDC是一种用于数据捕获更改的技术。广义上讲,任何能够捕捉数据变化的技术都可以称为CDC。但是通常我们所说的CDC技术主要是针对数据库的变更。CDC的实现方式主要有两种,基于查询和基于日志:基于查询:查询后向数据库插入和更新,无需对数据库和账户权限进行特殊配置。它的实时性是根据查询频率决定的,只有提高查询频率才能保证实时性,这必然会给DB带来巨大的压力。另外,由于它是基于查询的,无法捕获两次查询之间的数据变化记录,无法保证数据的一致性。基于日志:通过实时消费数据的变更日志实现,实时性非常高。并且不会对DB造成很大的影响,也能保证数据的一致性,因为数据库会在changelog中记录所有的数据变化。通过消费日志,可以清楚的知道数据的变化过程。它的缺点是实现起来比较复杂,因为变更日志在不同的数据库中的实现是不同的,格式、打开方式、特殊权限都不同,需要针对每个数据库进行相应的适配开发。正如Flink的宣言“实时就是未来”,在今天的背景下,实时性是一个亟待解决的重要问题。因此,我们对比了主流CDC的基于日志的技术,如上图所示:数据来源:FlinkCDC不仅对传统的关系型数据库有很好的支持,还支持基于文档、NewSQL(TiDB、OceanBase)等。目前流行的数据库可以支持;Debezium对数据库的支持相对没有那么广泛,但是对主流的关系型数据库已经做到了很好的支持;Canal和OGG只支持单一数据源。断点续传:四种技术均可支持。同步模式:除了Canal只支持增量,其他技术都支持全量+增量的方式。全量+增量方式是指首次上线时可以通过CDC技术实现从全量到增量的切换过程,无需通过全量任务和增量作业人为实现全量+增量数据读取。挑选。活跃度:FlinkCDC社区非常活跃,资料丰富,官方也提供了详细的教程和快速入门教程;Debezium社区也很活跃,但大部分信息都是英文的;Canal的用户群特别大,信息量相对较少,但社区活跃度一般;OGG是Oracle的大数据套件,需要付费,只有官方资料。开发难度:FlinkCDC依赖于FlinkSQL和FlinkDataStream两种开发模式,尤其是FlinkSQL,可以通过非常简单的SQL完成数据同步任务的开发,开发起来特别容易;Debezium需要自己分析收集到的数据变更日志,单独处理,Canal也是。运行环境依赖:FlinkCDC使用Flink作为引擎,Debezium通常使用Kafkaconnector作为运行容器;Canal和OGG都是分开运行的。下游丰富:FlinkCDC依托于Flink非常活跃的外设和丰富的生态,可以打通丰富的下游,对普通关系型数据库和大数据存储引擎Iceberg、ClickHouse、Hudi等提供了良好的支持;Debezium有KafkaJDBC连接器,支持MySQL、Oracle和SqlServer;Canal只能直接消费数据或者输出到MQ供下游消费;因为OGG是官方套件,下游丰富度不好。2、今天的内部业务场景2018年之前,大建云仓的数据同步方式是:通过多数据应用,定时同步系统间的数据。2020年以后,随着跨境业务的快速发展,多数据源应用经常会导致DB占满影响线上应用,同时定时任务的执行顺序管理混乱。因此,2021年,我们开始调研选择CDC技术,搭建小规模测试场景,进行小规模测试。2022年推出基于FlinkCDC的LDSS系统库存场景同步功能。未来我们希望依托FlinkCDC搭建数据同步平台,通过接口开发配置完成同步任务的开发、测试和上线,并能够在线管理同步任务的全生命周期。LDSS库存管理的业务场景主要包括以下四种:仓储部门:要求仓库的库存量和商品品类分布合理。在库存能力方面,需要预留一些缓冲,防止突发的入库订单造成爆仓;在商品品类方面,季节性商品库存分配不合理导致热点问题,必将给仓储管理带来巨大挑战。平台客户:希望订单能及时处理,货物能快速准确地送到客户手中。物流部:希望提高物流效率,降低物流成本,高效利用有限运力。决策部门:希望LDSS系统能够对何时何地新建仓库提供科学建议。上图是LDSS库存管理单场景的架构图。首先,通过多数据源同步的应用,拉取存储系统、平台系统、内部ERP系统数据,将需要的数据提取到LDSS系统的数据库中,支撑三大模块的业务功能。LDSS系统:订单、库存和物流。其次,需要产品信息、订单信息和仓库信息来做出有效的订单拆分决策。多数据源定时同步任务基于JDBC查询,通过时间过滤将变化的数据同步到LDSS系统。LDSS系统根据这些数据进行分单决策,得到最优解。定时任务同步的代码首先需要定义定时任务,定义定时任务的类、执行方式、执行间隔。上图左边是定时任务的定义,右边是定时任务的逻辑开发。首先打开Oracle数据库进行查询,然后upsert到MySQL数据库,这样就完成了定时任务的开发。这里以接近原生JDBC的查询方式,将数据依次塞入对应的数据库表中。开发逻辑非常繁琐,容易出现bug。因此,我们在FlinkCDC的基础上进行了修改。上图展示了一个基于FlinkCDC的实时同步场景。唯一的变化是将之前的多数据源同步应用换成了FlinkCDC。首先通过SqlServerCDC、MySQLCDC、OracleCDC分别连接并提取存储平台和ERP系统数据库对应的表数据,然后通过Flink提供的JDBC连接器写入到LDSS系统的MySQL数据库中。可以通过SqlServerCDC、MySQLCDC、OracleCDC,将异构数据源转化为统一的Flink内部类型,然后写入下游。与之前的架构相比,该架构对业务系统没有侵入性,实现也比较简单。我们引入MySQLCDC和SqlServerCDC分别连接B2B平台的MySQL数据库和仓库系统的SqlServer数据库,然后通过JDBCConnector将提取的数据写入LDSS系统的MySQL数据库。通过以上改造,得益于FlinkCDC赋予的实时能力,无需管理复杂的定时任务。基于FlinkCDC同步代码的实现分为以下三步:第一步定义源表——需要同步的表;第二步,定义目标表——需要写入数据的目标表;第三步,通过insertselect语句,就可以完成开发的CDC同步任务了。上面的开发模型非常简单和合乎逻辑。此外,依托FlinkCDC和Flink架构的同步任务,还获得了失败重试、分布式、高可用、全增量一致性切换等特性。3、未来内部推广及平台建设上图为平台架构图。左边的Source是FlinkCDC+Flink提供的Source,可以通过丰富的Source提取数据,通过在数据平台上开发写入到Target中。目标端依托于Flink强大的生态,可以很好的支持数据湖、关系型数据库、MQ等。Flink目前有两种运行模式,一种是国内比较流行的FlinkonYarn,另一种是Flink在Kubernetes上。中间的数据平台向下管理Flink集群,向上支持SQL在线开发,任务开发,血统管理,任务提交,在线Notebook开发,权限配置,任务性能监控告警,也可以做数据源到良好的管理。公司内部对数据同步的需求特别强烈,需要借助平台来提高开发效率,加快交付速度。而且,平台化之后,可以统一公司内部的数据同步技术,聚集同步技术栈,降低维护成本。平台化的目标是:能够很好地管理数据源、表等元数据;任务的整个生命周期都可以在平台上完成;实现任务性能观察和报警;简化开发,快速上手,业务开发人员可以轻松培训后即可开始开发同步任务。平台化可以带来以下三个方面的好处:数据同步任务集中统一管理;平台管理维护同步任务的全生命周期;有专门的团队负责,团队可以专注于前沿的数据集成技术。借助该平台,可以快速应用更多的业务场景。实时数仓:希望利用FlinkCDC支持更多的实时数仓业务场景,利用Flink强大的计算能力做一些数据库的物化视图。将计算从DB中解放出来,通过Flink的外部计算重写回数据库,加速平台应用的报表、统计、分析等实时应用场景。实时应用:FlinkCDC可以从DB层捕捉变化,因此可以通过FlinkCDC实时更新搜索引擎中的内容,将财务会计数据实时推送到财务系统。因为金融系统中的大部分数据都需要业务系统通过运行定时任务和大量的关联、聚合、分组等操作进行计算,然后推送到金融系统。借助FlinkCDC强大的数据抓取能力,再加上Flink的计算能力,可以将这些数据实时推送到会计系统和财务系统,从而及时发现业务问题,减少公司的损失。缓存:通过FlinkCDC,可以构建一个与传统应用分离的实时缓存,大大提升线上应用的性能。借助平台,相信FlinkCDC可以更好的在公司内部释放能力。上图展示了SqlServerCDC的原理。社区同学使用当前版本的SqlServerCDC后,主要反馈的问题如下:快照过程中的表锁:表锁操作是DBA和线上应用难以忍受的。会影响在线申请。不能在快照过程中检查点:不检查点意味着一旦快照过程失败,只能重新启动快照过程,这对大表非常不友好。快照过程只支持单并发:千万级、上亿级的大表在单并发的情况下需要同步十几个小时甚至几十个小时,这极大地限制了SqlServerCDC的应用场景。我们针对以上问题进行了实践和改进,参考社区2.0版MySQLCDC并发无锁算法,对SqlServerCDC进行了优化,最终实现了快照过程中的无锁一致性快照;快照过程中支持checkpoint;快照过程中支持并发,以加快快照过程。在大表同步的情况下,并发优势尤为明显。但是,由于2.2版本社区将MySQL的并发无锁思想抽象成了一个统一的、公共的框架,SqlServerCDC需要重新适配这个通用框架才能贡献给社区。问与答Q1需要开启SqlServer自带的CDC吗?是的,SqlServerCDC的功能是基于SqlServer数据库本身的CDC特性实现的。Q2物化视图如何刷新定时任务触发器?需要生成物化视图的SQL通过FlinkCDC运行在Flink中,由原表的变化触发计算,然后同步到物化视图表中。Q3平台化是怎么做的?平台化是指社区中有很多开源项目和优秀的开源平台,比如StreamX、DLink等优秀的开源项目。Q4SqlServerCDC在消费事务日志时是否存在瓶颈?SqlServer不直接消费日志。原理是SqlServer抓包进程匹配日志中哪些表启用了CDC,然后从日志中取出这些表的变更数据启用CDC表,然后插入到变更表中。最后通过开启CDC获取到数据变化后数据库生成的CDC查询功能。Q5FlinkCDC高可用是如何保证超量同步任务或密集处理方案的?Flink的高可用依赖于Flink的Checkpoint等特性来保证。在同步任务过多或者密集处理方案的情况下,建议使用多套Flink下游集群,然后根据实时同步情况区别对待,将任务发布到对应的集群中。Q6中间需要Kafka吗?这取决于同步任务或数据仓库架构是否需要为中间数据实现Kafka。Q7一个数据库中有多个表,可以在一个任务中运行吗?取决于它是如何开发的。如果是SQL开发方式,只能采用多任务一次写多张表的方式。不过FlinkCDC提供了另一种比较高级的开发方式DataStream,可以在一个任务中跑多张表。Q8FlinkCDC支持从Oracle从库读取日志吗?目前这是不可能的。Q9如何通过CDC监控比较两端同步后的数据质量?目前只能通过定期抽样检查数据质量。数据质量一直是业界的难题。Q10大建云仓使用什么调度系统?系统如何与FlinkCDC集成?使用XXLJob作为分布式任务调度,CDC不使用定时任务。Q11如果采集了增删表,需要重启SqlServerCDC吗?SqlServerCDC目前不支持动态添加表的功能。Q12同步任务会影响系统性能吗?基于CDC的同步任务肯定会影响系统性能,尤其是快照过程会影响数据库,进而影响应用系统。未来社区将对所有连接器进行限流和并发无锁的实现,都是为了扩展CDC的应用场景和易用性。Q13如何处理全量保存点和增量保存点?(connectors不是通过concurrentlock-freeframework实现的)full过程中不能触发Savepoint。如果在增量过程中需要停止发布,可以通过保存点恢复任务。Q14CDC同步数据到Kafka,Kafka存储Binlog,如何保存历史数据和实时数据?将CDC同步的所有数据同步到Kafka。保留的数据依赖于Kafka日志的清理策略,可以保留所有的数据。Q15CDC会过滤Binlog的日志操作类型吗?会不会影响效率?即使有过滤操作,对性能的影响也很小。Q16CDC在读取MySQL初始快照阶段时,多个程序读取不同的表,会出现程序错误,无法获取锁表权限。是什么原因?建议先查看mysqlcdc是不是老的实现方式,可以试试新版本的并发无锁实现。Q17如何连接1亿多张MySQL表的全量表和增量表?推荐看一下学金老师2.0的相关博客,里面介绍的非常简单明了,如何实现无锁并发的一致快照,完成全量和增量的切换。