当前位置: 首页 > 后端技术 > Java

RocketMQConnect搭建流式数据处理平台

时间:2023-04-01 13:51:50 Java

本文作者:孙小剑,ApacheRocketMQCommitter01RocketMQConnectRocketMQConnect是RocketMQ与其他系统之间传输流式数据的可扩展工具,可以方便的将RocketMQ与其他存储系统集成技术是融合的。RocketMQConnect使用特定的Source插件类型向RocketMQTopic发送数据,通过Sink监听Topics将数据写入下游指定的数据存储。在使用过程中,Connector可以通过JSON进行配置,无需编码。数据流的过程通过RocketMQ从源到目的进行桥接。RocketMQConnect具有以下特点:①通用性:Connect开发了标准的API,包括Connector、Task、Converter、Transform。开发者可以通过标准的API来扩展他们的插件来满足他们自己的需求。②Offset自动管理(断点续传):Source——用户开发Connect时,可以通过Offset拉取增量数据。系统内部会自动管理Offset,并会持久化上次拉取的Offset信息。下次重启任务时,可以通过上次提交的Offset继续增量数据拉取,不需要从头同步数据;Sink——基于RocketMQ自带的Offset提交策略,内部实现了自动提交方式,当任务运行时会自动处理,允许用户配置offset提交间隔;如果系统自身的offset能够满足要求,则无需维护offset;如果系统自身的offset不能满足要求,可以通过TaskAPI维护。TaskAPI有自己的Offset维护能力,你可以在Connect中决定Offset的持久化逻辑,比如持久化到MySQL、Redis。任务下次启动时,可以自动从Offset存储位置获取下一次执行的Offset,继续做增量拉取。③分布式、可扩展、容错:可以分布式部署,自带容错能力。当一个Worker宕机或新增一个Worker时,任务会自动重新分配并运行,以平衡每个集群中的Worker。任务失败后,也会自动重试。重试后可以自动rebalance到不同的Worker机器上。④运维与监控:Connect提供标准的集群管理功能,包括Connect管理功能和插件管理功能。您可以通过API启动和停止任务,也可以在运行过程中查看任务的运行状态和异常状态。并且可以报告指标。在任务中拉取和写入数据后,可以通过Metrics方法报告数据总量、数据速率等。另外,Metrics还提供了标准的上报API,可以在标准API的基础上扩展metrics和上报的方式,比如向RocketMQ主题、Prometheus等上报。⑤批流融合:当Source在做数据拉取的时候,可以通过JDBC或者指定插件sdk拉取批量数据,转为流方式。也可以采用CDC方式,通过增量快照或者Mysql类binlog监控方式,在源头获取全量和增量变化数据,推送到RocketMQ。下游可以通过Flink或RocketMQStream进行流处理进行状态计算,也可以直接落入数据存储引擎,如Hudi、Elasticsearch、Mysql等。⑥Standalone、Distributed模式:Standalone模式主要用于测试环境,以及分布式模式主要用于生产环境。试用过程中,您可以使用Standalone方式进行部署。得益于它不做Config存储,每次启动都可以带上一个独立的task来帮助debug。Connect组件包括以下几类:Connector:作为任务协调的高层抽象,描述了Task如何运行以及如何拆分Task。Task:负责实际的数据拉取操作,负责offset的维护和TaskMetrics数据的收集。Worker:执行Task任务的进程。RecordConverter:执行Source和Sink之间的数据转换。Record通过Schema制定数据契约。Schema可以通过RocketMQSchemaRegistry与数据一起传输或远程存储。目前支持两种类型的转换器,Avro和JSON。Transform:在数据传输过程中进行数据转换。如字段变更、类型变更、空值或已知错误值过滤等;也可以通过扩展groovytransform、pythontransform等脚本对数据进行复杂的转换,也可以远程调用完成静态数据或者做函数计算。DeadLetterQueue:在Source到Sink的数据传输过程中,数据转换错误,网络超时,逻辑错误导致写入失败等,可以根据插件逻辑决定是否将数据写入错误队列写入,或者忽略错误继续,或者发生错误后停止任务等。写入错误队列的数据可以在自助异步修复后重写,无需担心数据的顺序。Metrics:提高任务运行过程的可观察性,在任务拉取和写入数据时,需要监控任务拉取数据量、写入数据量、拉取率、写入率、差异,内存占用等,可以通过Metrics上报,供系统运维。上图显示了Connect中的数据流向。分布式部署下,Source和Sink可以在不同的Worker中,互不依赖。连接器可以包含要按顺序执行的任务、转换和转换器。Task负责从源头拉取数据,任务并发数由自定义插件的sharding方式决定。数据拉取后,如果中间配置了数据处理Transform,数据会依次经过一个或多个配置好的Transform,然后将数据发送给Converter,Converter将数据重新组织成一个可转移的方法。RocketMQSchemaRegistry会对Schema进行校验、注册或升级,转换后的数据最终写入中间Topic,供下游Sink使用。下游的Sink可以选择性地监听一个或多个Topic。Topic中传输的数据可以在同一个存储引擎中,也可以在异构存储引擎中。数据经过Sink转换后,最终传输给流计算引擎。或者直接写入目标存储。在转换过程中,SourceConverter和SinkConverter必须保持一致。不同的Converter解析出的Schema格式会有所不同。如果Converter不一致,会导致Sink无法解析数据。通过自定义Transform可以兼容不同组件之间的差异化。上述架构有以下优点:①松散架构:Source和Sink通过Topic解耦,E、T、L不再是一个整体。通常同一个存储引擎读写数据的QPS相差很大,所以集成的ETL在读取数据时会受到目标数据库写入性能的制约。RocketMQConnect中Source和Sink解耦后,Source和Sink可以独立伸缩,实现数据读写的动态平衡,互不影响。②标准API:降低使用难度,易于扩展。具体写并发的方法在API中抽象出来,插件开发者可以自定义拆分。③标准数据抽象:使用Topic解耦后,需要在Source和Sink之间建立数据契约。Connect主要通过Schema实现数据约束。这样就支持了异构数据源之间的数据集成。④关注数据拷贝:Connect主要关注异构数据源的数据集成,不做流计算,支持数据拷贝到流(Flink、RocketMQStream)系统,再进行流计算。⑤重量轻:依赖性小。如果集群中已有RocketMQ集群,可以直接部署RocketMQConnect进行数据同步。部署非常简单,不需要额外的调度组件。RocketMQConnect自带任务分发组件,无需额外关注。另外,依托RocketMQ强大的性能,可以在不同系统之间进行大规模的数据迁移。Source主要依赖RocketMQ的写入能力,不需要等待事务结束的数据写入。依靠topic的可扩展性,sink可以根据中间topic的partition数量来决定下游sink的并发量,自动扩容。任务扩展后,系统会重新分配连接器,保证负载均衡,Offset不会丢失。无需人工干预,可根据上次运行状态继续向下运行。您还可以依赖RocketMQ的有序策略来同步顺序数据。02RocketMQConnect原理管理区——主要用于接收任务配置变更或查询,包括Connector的创建、删除、更新、启停、查看等操作。更改任务后,管理端将任务提交给RocketMQ共享配置的Topic。因为每个Worker都监听同一个Topic,所以每个Worker都可以获取Config信息,进而触发集群Rebalance重新分配任务,最终实现全局任务均衡。Runtimezone——主要为已经分配给当前Worker的Task提供运行空间。包括任务初始化、数据抓取、Offset维护、任务启停状态上报、Metrics指标上报等。调度方面——Connect自带任务分配调度工具,通过hash或consistenthash来平衡Worker之间的任务,主要监听Worker和Connector的变化。如添加或删除Worker、更改Connector配置、启动和停止任务等。获取状态变化用于更新本地任务状态,决定是否进行下一轮Rebalance操作,实现整个集群的负载均衡。管理端、运行时区、调度区存在于每个集群的每个worker中。集群中worker之间的交流主要是通过分享topic来通知。worker作为主节点和备节点没有区别,这使得集群运维非常容易。很方便,只需要在Broker中创建一个对应的共享topic即可,但是由于改变Task状态的动作只会发生在一个Worker中,集群之间的共享会有短暂的延迟,所以可能会有通过RestApi不一致查询连接器状态时的短时间。服务发现过程。当有变化时,每个Worker都能发现节点变化,实现服务自动发现的效果。①在启动一个新的Worker时,Worker会向依赖的RocketMQTopic注册客户端变更监视器。对于同一个ConsumerGroup,当有新的client加入时,注册事件的client会收到一个change通知。Worker收到change事件后,会主动更新当前集群的Worker列表。②当Worker下降或收缩时,也会产生同样的效果。RocketMQConnect任务分配流程如下:通过调用RestAPI创建Connector。如果Connector不存在,会自动创建,如果存在,会更新。创建后,会向ConfigTopic发送通知,通知Worker有任务变更。Worker获取任务变更后,重新分配,达到负载均衡的效果。停止任务将具有相同的效果。目前,每个Worker都会存储全量的任务和状态,但只会运行分配给当前Worker的任务。目前系统默认提供简单哈希和一致性哈希两种任务分配方式。建议选择一致性哈希模式。因为在consistenthash的情况下,做Rebalance时的变化幅度比普通hash要小,一些已经分配的任务不会再加载。连接器扩展元素分为自定义配置、并发和任务信息。自定义配置包括connection信息(核心配置项)、Convertor信息、Transform信息等,Connector只是作为任务全局的汇总和协调者,实际作用还是分配的Task。比如1亿条数据,分成多个task去拉,分别在不同的Task中执行。因此,需要使用Connector按照合理的逻辑拆分Task。这些拆分操作需要在声明Connector时指定。Connecor拆分配置后,将实际的数据拉取逻辑配置告知Task,由Task决定数据拉取的具体方式。任务扩展元素包括配置初始化、连接打开和关闭、拉取频率、错误处理、实际数据拉取逻辑和Offset维护。整个系统中的全局Converter转换使用同一套API,分为两种模式:Local模式:数据从SourceConnect拉取后,由Converter进行数据转换。在转换过程中,本地操作会把Schema和value值组合成一个Connectrecord传给下游。下游使用同一个Converter将其转换为Record,推送给Sink任务进行数据写入。中间通过ConvertSchema做了一个数据契约,可以在Source和Sink之间进行转换。本地模式下,Schema和Value作为一个整体传输,数据体非常臃肿,每条数据都有Schema信息。但它的优点是不存在版本兼容问题。Remote模式:数据转换时,Schema会存储在远程RocketMQSchemaRegistry系统中,数据传输过程中只携带Value值,不带Schema约束信息。当Sink订阅一个Topic时,通过信息头中的RecordID获取Schema信息,进行Schema校验,校验通过后进行数据转换。Schema在RocketMQSchemaRegistry系统中维护。因此在转换过程中,可以在系统中手动更新Schema,然后使用指定的SchemaID进行转换,但需要在Converter插件中做数据兼容。ConnectConverter内置了扩展,包括原生JSON、普通数据类型Converter等,如果内置扩展不能满足你的需求,你可以通过RecordConverterAPI自行扩展。扩展完成后,将Converter包放到Worker运行时插件目录下,系统会自动加载。有两种配置方式:Key和Value。其中Key是唯一的标记数据,也可以是Struct结构化数据;Value是真正传输的数据。Transform是Connector和Convertor之间进行数据映射转换和简单计算的辅助工具。当SourceConverter和SinkConnector在使用过程中不能满足业务需求时,可以通过编写Transform插件进行数据适配。例如不同业务、不同数据源插件之间的数据转换,如字段映射、字段推导、类型转换、字段补全、复杂函数计算等。系统内置的Transform模式包括字段扩展、替换等,如果不满足需求,可以通过API自行扩展Transform。部署时,只需要将写好的扩展包放到对应的插件目录下,即可自动加载。具体配置方法如上图左下方所示。Transform是串行运行的,可以对一个值进行多次转换,可以配置多个Transform。如果需要配置多个Transforms,用逗号分隔,名字不能重复。SourceTask在做数据拉取或者变更监听时,比如通过JDBCMysql做增量数据拉取时,需要指定offset增量拉取方式,可以是自增ID,也可以是修改时间。每次数据拉取完成并发送成功后,会将增量信息(id或修改时间)提交给Offsetwriter,由系统异步持久化。下次启动任务时,会自动获取Offset,从上次的位置开始处理数据,达到断点续传的效果。封装Offset时没有固定的模式。您可以按照自己的方式连接Offset键或值。唯一依赖的就是RocketMQ中的Connectoffset主题信息,主要是推送给其他worker本地的Offset更新。如果使用系统的offset维护,用户只需要决定维护上报逻辑,不需要关注如何保证offset提交,offset回滚方式等,一切由系统来保障。在运行过程中,如果启用了死信队列,则正确的数据将发送到目的地,错误的数据将发送到错误队列。业务方可以异步处理数据,但这种情况下不能保证顺序。如果要保证数据的顺序,需要在触发错误时停止Task,先修复数据,修复后再启动Task。如果单个Task在处理数据时报错,只需要停止出错的Task,其他Task不受影响。因为每个task在处理数据的时候会消费不同的query,如果指定一个key,会根据key对数据进行分区,然后保证分区中的每个query是有序的,所以单个task的stop不会影响全球秩序。03RocketMQConnect使用场景RocketMQConnect可以适用于传统ETL适用的大部分场景。此外,RocketMQConnect还可以实现传统ETL无法实现的实时流、流批一体、快照功能。新旧系统迁移场景:业务部门升级变更过程中,类型变更、分表或扩容操作、增加索引等都可能导致宕机,耗时较长。数据迁移可以通过RocketMQConnect完成。Sharding分库分表场景:分库分表的插件市面上有很多。可以适配开源分库客户端通过Connect做分库工作,也可以基于RocketMQ做分库逻辑。与目的地相同。单表取数据后,就可以在Transform中完成数据库和表的逻辑。路由可以通过Transform来完成。路由到不同的主题,在下游,可以听不同的主题,落入已经分好的库表。多活:RocketMQConnect支持集群间Topic和元数据拷贝,可以保证多中心Offset的一致性。数据订阅场景:通过CDC方式监控数据,通知下游数据。用于下游数据订阅和实时数据更新。同时也可以通过HTTP直接拉取和推送数据到下游业务系统,类似Webhook的方式,但是需要对请求进行权限验证,限流等。其次,还有数据入湖、冷数据备份、异构数据源数据集成等业务场景。RocketMQConnect可以用作数据处理解决方案。从整体的使用场景来看,大致可以分为两部分,数据集成和流式处理。数据集成主要是将数据从一个系统迁移到另一个系统,可以在异构数据源中进行数据同步。流处理主要是通过批数据拉取批处理信息,或者CDC方式将增量数据同步到对应的流处理系统,进行数据聚合、窗口计算等操作,最后通过Sink写入存储引擎。04RocketMQConnect生态RocketMQConnect目前支持上图中的所有产品,平台也提供了KafkaConnect插件的适配。