数据栈技术分享:详解FlinkX中的断点续传和实时采集基于Flink统一的批流数据同步工具,既可以采集静态数据,也可以采集实时变化的数据。它是一个全局的、异构的、批流式的集成数据同步引擎。如果喜欢,请给我们一个star!星星!星星!github开源项目:https://github.com/DTStack/fl...gitee开源项目:https://gitee.com/dtstack_dev...KangarooYunyun原生一站式数据平台PaaS——数字栈,覆盖各类工具(包括数据开发平台、数据资产平台、数据科学平台、数据服务引擎等)处理,提高数据价值提取能力。目前数据栈-离线开发平台(BatchWorks)中的数据离线同步任务和数据栈-实时开发平台(StreamWorks)中的数据实时采集任务已经基于FlinkX进行了统一。离线数据采集和实时数据采集的基本原理是一样的。主要区别在于源流是否有界。所以Flink的StreamAPI就是用来实现这两种数据同步场景,实现数据同步的批流。团结。一、功能介绍1、断点续传断点续传是指数据同步任务在运行过程中由于各种原因失败。不需要重新同步数据,只需要从上次失败的位置继续同步即可。类似于因网络原因下载文件失败,不需要重新下载文件,继续下载即可,可以大大节省时间和计算资源。断点续传是BatchWorks中数据同步任务的一个功能,需要结合任务的错误重试机制来完成。当任务运行失败时,它将在引擎中重试。重试时,会从上次失败时读取的位置继续读取数据,直到任务运行成功。2.实时采集实时采集是数据栈-实时开发平台(StreamWorks)中数据采集任务的一个功能。当数据源中的数据增加、删除或修改时,同步任务会监听到这些变化,并将变化后的数据同步到目标数据源。除了实时数据变化之外,实时采集与离线数据同步的另一个区别是,实时采集任务不会停止,任务会一直监控数据源是否发生变化。这与Flink任务是一致的,所以实时采集任务是数据栈流计算应用中的一种任务类型,配置过程与离线计算中的同步任务基本一致。2.Flink中的Checkpoint机制断点续传和实时采集都依赖于Flink的Checkpoint机制,我们先简单了解一下。Checkpoint是Fl??ink容错机制的核心功能。它可以根据配置周期性的根据Stream中各个Operator的状态生成Snapshots,从而周期性的存储这些状态数据。当Flink程序意外崩溃时,可以重新运行。程序可以选择性地从这些Snapshots中恢复,从而纠正因故障导致的程序数据状态中断。当Checkpoint被触发时,会在多个分布式StreamSource中插入一个Barrier标签,这些Barrier会随着Stream中的数据记录流向各个下游的Operator。当Operator收到Barrier时,它会暂停处理Steam中新收到的数据记录。因为一个Operator可能有多个inputStreams,而每个Stream中都会有对应的Barrier,所以Operator会一直等到所有inputStreams中的Barrier都到达。当Stream中的所有Barriers都到达Operator时,所有的Barriers看起来都在同一时间点(说明它们已经对齐)。在等待所有Barrier到达的过程中,Operator的Buffer可能已经缓存了一些比Barrier更早到达Operator的数据记录(OutgoingRecords),此时Operator会发出(Emit)这些数据记录(OutgoingRecords)作为下游Operator的输入,最终发出(Emit)Barrier对应的Snapshot作为这个checkpoint的结果数据。三、断点续传1.前提条件同步任务必须支持断点续传,对数据源有一些强制性要求:1)数据源(这里特指关系型数据库)必须包含升序字段,比如主键或者日期类型的字段。在同步过程中会使用checkpoint机制来记录这个字段的值。任务恢复时,使用该字段构造查询条件,过滤同步数据。如果该字段的值不是升序,则在任务恢复时进行过滤2)数据源必须支持数据过滤,否则任务无法从断点恢复,会导致数据重复;3)目标数据源必须支持Transactions,比如关系型数据库,文件类型的数据源也可以通过临时文件来支持。2.任务运行的详细过程下面我们用一个具体的任务来详细介绍一下整个过程。任务详情如下:1)读取数据时,首先要做的是构建数据分片。构建数据分片是基于通道索引和检查点记录。location构造查询sql,sql模板如下:select*fromdata_testwhereidmod${channel_num}=${channel_index}andid>${offset}如果是第一次运行,或者checkpoint还没有在上一个任务失败时被触发,那么offset不存在,具体查询可以根据offset和channel来确定。当sql:offset存在时,第一个通道:select*fromdata_testwhereidmod2=0andid>${offset_0};第二个通道:select*fromdata_testwhereidmod2=1andid>${offset_1};偏移量不存在时的第一个通道:select*fromdata_testwhereidmod2=0;第二个通道:select*fromdata_testwhereidmod2=1;datafragmentation构建后,每个channel根据自己的数据分片读取数据。2)写数据在写数据之前会做几个操作:a.检查/data_test目录是否存在。如果该目录不存在,则创建该目录。如果目录存在,执行2个操作;b.判断是否以覆盖方式写入Data,如果是,则删除/data_test目录,然后创建该目录,如果不是,则进行3次操作;c、检查/data_test/.data目录是否存在,如果存在先删除,再创建,保证没有其他任务异常失败遗留的脏数据文件;数据单条写入hdfs,不支持批量写入。数据会先写入到/data_test/.data/目录下,数据文件的命名格式为:channelIndex.jobId.fileIndex包括三部分:channelindex,jobId,fileindex。3)当checkpoint被触发时,FlinkX中的“status”代表标识字段id的值。我们假设当checkpoint被触发时,两个channel的读写如图所示:checkpoint被触发后,先生成两个reader。Snapshot记录读取状态,通道0的状态为id=12,通道1的状态为id=11。Snapshot生成后,在数据流中插入barrier,barrier跟随数据流向Writer。以Writer_0为例。Writer_0接收Reader_0和Reader_1发送的数据。假设它先接收到Reader_0的屏障。此时Writer_0停止向HDFS写入数据,先将接收到的数据放入InputBuffer,等待Reader_1的barrier到来。然后将Buffer中的数据全部写出,然后生成Writer的快照。整个checkpoint结束后,记录的任务状态为:Reader_0:id=12Reader_1:id=11Writer_0:id=UnabletodetermineWriter_1:id=Unabletodeterminethetaskstatus会记录到配置的HDFS目录/flinkx/检查点/abc123。因为每个Writer会收到两个Reader的数据,而且每个通道的数据读写速率可能不同,所以writer收到数据的顺序是不确定的,但这并不影响数据的准确性,因为读取数据构造查询sql只需要Reader记录的状态时,我们只需要保证数据真正写入HDFS即可。Writer在生成Snapshot之前,会做一系列的操作来保证接收到的数据全部写入HDFS:关闭写入HDFS文件的数据流。此时/data_test/.data目录下会生成两个文件:/data_test/.data/0.abc123.0/data_test/.data/1.abc123.0b。将生成的两个数据文件移动到/data_test目录下;C。将文件名模板更新为:channelIndex.abc123。1;快照生成后,任务继续读写数据。如果在生成快照的过程中出现异常,则任务会直接失败,从而不会生成本次快照,任务恢复时会从上次成功的快照开始恢复。4)任务正常结束。当任务正常结束时,执行与生成快照时相同的操作,如关闭文件流、移动临时数据文件等。5)任务异常终止如果任务异常结束,假设最后一个检查点的状态任务结束时的record为:Reader_0:id=12Reader_1:id=11,那么当任务恢复时,会将每条通道record的状态赋值到offset,再次读取取数据时构造的sql为:第一个channel:select*fromdata_testwhereidmod2=0andid>12;【点击拖动移动】第二个通道:select*fromdata_testwhereidmod2=1andid>11;【点击拖动移动】可以从上次失败的位置继续读取数据。3.支持断点续传的插件理论上只要支持过滤数据的数据源和支持事务的数据源都可以支持断点续传的功能,目前FlinkX支持的插件如下:4.实时采集目前FlinkX支持实时采集的插件包括KafKa和binlog插件。binlog插件是专门为mysql数据库实时采集而设计的。如果要支持其他数据源,只需要将数据发送到Kafka,然后使用FlinkX的Kafka插件消费数据即可。比如oracle,只需要使用oracle的ogg向kafka发送数据即可。这里具体讲解下mysql实时采集插件binlog。1、binlogbinlog是Mysqlserver层维护的二进制日志,与innodb引擎中的redo/undolog完全不同;主要用于记录更新或可能更新mysql数据的SQL语句,以“Transactions”开头存储在磁盘上。binlog的主要功能有:1)Replication:MySQLReplication在Master端开启binlog,Master将自己的binlog传递给slave并回放,达到主从数据一致性的目的;2)数据恢复:通过mysqlbinlog工具恢复数据;3)增量备份。2、MySQL主备复制仅仅有一个记录数据变化的binlog日志是不够的。我们还需要用到MySQL的主备复制功能:主备复制是指一台服务器作为主数据库服务器,另一台或多台服务器作为从服务器。数据库服务器,主服务器中的数据自动复制到从服务器。主从复制的过程:1)MySQLmaster将数据变化写入二进制日志(binarylog,这里的记录称为binarylogevents,可以通过showbinlogevents查看);2)MySQLslave将master的二进制日志eventsCopy到它的中继日志(relaylog);3)MySQLslave重放中继日志中的事件,并更改数据以反映自己的数据。3、写入Hivebinlog插件,监控多张表的数据变化。解析后的数据包含表名信息。读取的数据可以写入目标数据库中的表,也可以基于数据中包含的表。名称信息可以写入不同的表,目前只有Hive插件支持该功能。Hive插件目前只有write插件,功能是基于HDFSwrite插件,也就是说从binlog读取和写入hive也支持故障恢复的功能。写入Hive的过程:1)从数据中解析出MySQL表名,然后根据表名映射规则转换成对应的Hive表名;2)检查Hive表是否存在,不存在则创建Hive表;3)查询Hive表的相关信息,构造HdfsOutputFormat;4)调用HdfsOutputFormat向HDFS写入数据。
