当前位置: 首页 > 科技观察

实战!SpringBoot集成阿里开源中间件Canal实现增量数据同步!

时间:2023-03-23 10:07:58 科技观察

本文转载自微信公众号《码猿科技专栏》,作者陈谋。转载本文请联系码猿科技专栏公众号。数据同步一直是个让人头疼的问题。当业务量小,场景不多,数据量不大的时候,我们可能会选择在项目中直接写一些定时任务来手动处理数据,比如从多个表中查出数据,汇总处理,然后插入到相应的地方。但是随着业务量的增加,数据量越来越大,各种复杂场景下的分库分表的实现,使得数据同步变得越来越困难。今天的文章使用阿里开源的中间件Canal来解决增量数据同步的痛点。文章目录如下:什么是Canal?Canal译为水路/管道/沟渠,其主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。你从这句话中明白了什么?基于MySQL,通过对MySQL日志的增量分析,意味着对原有业务代码完全无侵入。工作原理:解析MySQLbinlog日志,提供增量数据。基于日志增量订阅和消费的业务包括数据库镜像数据库实时备份索引构建和实时维护(拆分异构索引、倒排索引等)业务缓存刷新增量数据处理业务逻辑当前渠道支持源码MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。官方文档:https://github.com/alibaba/canalCanal数据是如何传输的?先来看一张官方图:Canal分为server和client。Server:负责解析MySQL的binlog,传递增量数据给客户端或消息中间件client:负责解析从server端传过来的数据,然后自定义自己的业务处理。目前支持的消息中间件非常全面,比如Kafka、RocketMQ、RabbitMQ。有没有其他的数据同步的中间件?是的,当然有,而且一些开源的中间件也相当不错,比如Bifrost。几种常见的中间件的区别如下:当然,非要选的话,阿里的中间件Canal是首选。Canal服务器安装服务器需要下载压缩包,下载地址:https://github.com/alibaba/canal/releases最新的是v1.1.5,点击下载:下载解压,目录如下:这个文章使用Canal+RabbitMQ进行数据同步,所以后面的步骤完全按照这个base进行。1、打开MySQLbinlog日志,修改MySQL日志文件。my.cnf配置如下:[mysqld]log-bin=mysql-bin#Openbinlogbinlog-format=ROW#SelectROWmodeserver_id=1#ConfigureMySQLreplaction需要定义,不要和canal的slaveId重复2.要设置MySQL配置,需要在服务器配置文件中设置MySQL配置,这样Canal才能知道需要监控哪个库,哪个表的日志文件。一个服务器可以配置多个实例来监控。Canal函数默认带有一个示例实例。本文使用示例实例。如果添加实例,将example文件夹的内容复制到同级目录下,然后在canal.properties中指定添加实例的名称。修改canal.deployer-1.1.5\conf\example\instance.properties配置文件#urlcanal.instance.master.address=127.0.0.1:3306#username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=root#listen数据库canal.instance.defaultDatabaseName=test#Monitoring表,可以指定,多个用逗号隔开,这里是监控所有canal.instance.filter.regex=.*\\..*3、设置RabbitMQ的配置服务终端默认传输方式为tcp,需要在配置文件中设置MQ相关信息。这里需要修改两个配置文件,如下;1)、canal.deployer-1.1.5\conf\canal.properties这个配置文件主要是设置MQ相关的配置,比如URL,用户名,密码...#传输方式:tcp,kafka,rocketMQ,rabbitMQcanal.serverMode=rabbitMQ############################################################RabbitMQ#################################################################rabbitmq.host=127.0.0.1rabbitmq.virtual.host=/#exchangerabbitmq。exchange=canal.exchange#username,passwordrabbitmq.username=guestrabbitmq.password=guest##是否持久化rabbitmq.deliveryMode=22),canal.deployer-1.1.5\conf\example\instance.properties这个文件设置路由MQ的KEY以便路由到指定队列,如下:canal.mq.topic=canal.routing.key4,RabbitMQ新建一个exchange,RabbitMQ中的Queue需要新建一个canal.exchange(必须是与配置相同)交换和名为canal.queue(随机名称)的队列。绑定的路由KEY为:canal.routing.key(必须和配置中的一致),如下图:5.启动服务器,点击bin目录下的脚本,双击startup。直接在Windows上运行。启动成功如下:6.测试在本地数据库test中向oauth_client_details中插入一条数据,如下:INSERTINTO`oauth_client_details`VALUES('myjszl','res1','$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W','all','passtokword,'密码,authorization_code,client_credentials,implicit','http://www.baidu.com',NULL,1000,1000,NULL,'false');这时查看MQ中的canal.queue是否已经有数据,如下:其实是一串JSON数据,JSON如下:{"data":[{"client_id":"myjszl",“resource_ids”:“res1”,“client_secret”:“$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W”,“scope”:“all”,“authorized_grant_types”:“密码,refresh_token,authorization_code,client_credentials,隐式”,"web_server_redirect_uri":"http://www.baidu.com","authorities":null,"access_token_validity":"1000","re??fresh_token_validity":"1000","additional_information":null,"autoapprove":"false"}],"database":"test","es":1640337532000,"id":7,"isDdl":false,"mysqlType":{"client_id":"varchar(48)","re??source_ids":"varchar(256)","client_secret":"varchar(256)","scope":"varchar(256)","authorized_grant_types":"varchar(256)","web_server_redirect_uri":"varchar(256)","authorities":"varchar(256)","access_token_validity":"int(11)","re??fresh_token_validity":"int(11))","additional_information":"varchar(4096)","autoapprove":"varchar(256)"},"old":null,"pkNames":["client_id"],"sql":"","sqlType":{"client_id":12,"resource_ids":12,"client_secret":12,"scope":12,"authorized_grant_types":12,"web_server_redirect_uri":12,"authorities":12,"access_token_validity":4,"refresh_token_validity":4,"additional_information":12,"autoapprove":12},"table":"oauth_client_details","ts":1640337532520,"type":"INSERT"}每个字段的意思很清楚,有一个表名、方法、参数、参数类型、参数值……客户端需要做的就是监听MQ获取JSON数据,然后解析出来,处理自己的业务逻辑。Canalclient构建客户端实现起来非常简单。要做的就是消费Canal服务器传递过来的消息,监听canal.queue队列。1.创建消息实体类。MQ传输JSON数据。当然需要创建一个接收数据的实体类,如下:/***@author公众号码猿技术专栏*Canal消息接收实体类*/@NoArgsConstructor@DatapublicclassCanalMessage{@JsonProperty("type")privateStringtype;@JsonProperty("table")privateStringtable;@JsonProperty("data")privateListdata;@JsonProperty("database")privateStringdatabase;@JsonProperty("es")privateLonges;@JsonProperty("id")privateIntegerid;@JsonProperty("isDdl")privateBooleanisDdl;@JsonProperty("old")privateListold;@JsonProperty("pkNames")privateListpkNames;@JsonProperty("sql")privateStringsql;@JsonProperty("ts")privateLongts;}2、MQ消息监听业务后面是监听队列。一旦有来自Canal服务器的数据推送,就可以及时消费。代码很简单,给出一个接收案例即可,具体业务逻辑可以根据业务实现,如下:importcn.hutool.json.JSONUtil;importcn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;importlombok;importorg.springframe.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/***监听MQ获取Canal增量数据消息*/@Component@Slf4j@RequiredArgsConstructorpublicclassCanalRabbitMQListener{@RabbitListener(bindings={@QueueBinding(value=@Queue(value="canal.queue",durable="true"),exchange=@Exchange(value="canal.exchange"),key="canal.routing.key")})publicvoidhandleDataChange(Stringmessage){//将消息转换为CanalMessageCanalMessagecanalMessage=JSONUtil.toBean(message,CanalMessage.class);StringtableName=canalMessage.getTable();log.info("Canalmonitoring{}changes;details:{}",tableName,message);//TODO业务逻辑自我完善……}}3.测试下,面向Insertdataintothetable,看收到的消息是什么样的。SQL如下:INSERTINTO`oauth_client_details`VALUES('myjszl','res1','$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W','all','password,refresh_token,authorization_code,client_credentials,implicit','http://www.baidu.com',NULL,1000,1000,NULL,'false');客户端转换后的消息如下图所示:上图中可以看到所有的数据都已经成功接收了,你只需要根据这些数据改进你的业务逻辑即可。客户案例源码已上传至GitHub。关注公众号:码猿科技专栏,回复关键词:9530获取!总结数据增量Canal并不是唯一的同步开源工具,根据业务需求选择合适的组件。

最新推荐
猜你喜欢