数据同步一直是个头疼的问题。当业务量小,场景不多,数据量不大的时候,我们可能会选择在项目中直接写一些定时任务来手动处理数据,比如从多个表中查出数据,汇总处理,然后插入到相应的地方。但是随着业务量的增加,数据量越来越大,各种复杂场景下的分库分表的实现,使得数据同步变得越来越困难。今天的文章使用阿里开源的中间件Canal来解决增量数据同步的痛点。文章目录如下:什么是Canal?Canal翻译过来就是waterway/pipe/ditch,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。你从这句话中明白了什么?基于MySQL,通过对MySQL日志的增量分析,意味着对原有业务代码完全无侵入。工作原理:解析MySQLbinlog日志,提供增量数据。基于日志增量订阅和消费的业务包括数据库镜像数据库实时备份索引构建和实时维护(拆分异构索引、倒排索引等)业务缓存刷新增量数据处理业务逻辑当前渠道支持源码MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。官方文档:github.com/alibaba/can...Canal数据是如何传输的?先放一张官方图:Canal分为server和client,这也是阿里常用的,比如上面提到的注册中心Nacos:server:负责解析MySQL的binlog日志,将增量数据传递给clients或者Message中间件client:负责解析服务器发送过来的数据,然后定制自己的业务处理。目前支持的消息中间件非常全面,比如Kafka、RocketMQ、RabbitMQ。有没有其他的数据同步的中间件?是的,当然有,而且一些开源的中间件也相当不错,比如Bifrost。几种常见的中间件的区别如下:当然,非要选的话,阿里的中间件Canal是首选。Canal服务器安装服务器需要下载压缩包,下载地址:github.com/alibaba/can...最新的是v1.1.5,点击下载:下载解压,目录如下:本文使用Canal+RabbitMQ用于数据同步,所以后面的步骤完全按照这个base进行。1、打开MySQL的binlog日志,修改MySQL的日志文件。my.cnf配置如下:[mysqld]log-bin=mysql-bin#开启binlogbinlog-format=ROW#选择ROW模式server_id=1#配置MySQL复制需要定义,不要和canal的slaveId重复代码2.设置MySQL配置,需要在服务器配置文件中设置MySQL配置,这样Canal就可以知道需要监控哪个库,哪个表的日志文件。一个服务器可以配置多个实例来监控。Canal函数默认带有一个示例实例。本文使用示例实例。如果添加实例,将example文件夹的内容复制到同级目录下,然后在canal.properties中指定添加实例的名称。修改canal.deployer-1.1.5\conf\example\instance.properties配置文件urlcanal.instance.master.address=127.0.0.1:3306username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=root监听数据库canal。instance.defaultDatabaseName=test监控表,可以指定,多个用逗号隔开,这里是监控所有canal.instance.filter.regex=.\..复制代码3、设置RabbitMQ服务器的默认传输方式对于tcp,需要在配置文件中设置MQ相关信息。这里需要修改两个配置文件,如下;1.canal.deployer-1.1.5\conf\canal.properties这个配置文件主要是设置MQ相关的配置,比如URL,用户名,密码...传输方式:tcp,kafka,rocketMQ,rabbitMQcanal.serverMode=rabbitMQRabbitMQrabbitmq.host=127.0.0.1rabbitmq.virtual.host=/exchangerabbitmq.exchange=canal.exchange用户名,密码rabbitmq.username=guestrabbitmq.password=guest是否持久化rabbitmq。2复制代码2,canal.deployer-1.1.5\conf\example\instance.properties这个文件设置了MQ的路由KEY,使其能够路由到指定队列,如下:canal.mq.topic=canal.routing.key复制代码4.在RabbitMQ中新建一个exchange和Queue在RabbitMQ中,需要新建一个canal.exchange(必须和配置中的一致)exchange和一个名为canal.queue的queue(任意姓名)。绑定的路由KEY为:canal.routing.key(必须和配置中的一样),如下图:5.启动服务器点击bin目录下的脚本,双击startup.bat直接在Windows上。启动成功如下:6.测试在本地数据库test中向oauth_client_details中插入一条数据,如下:INSERTINTOoauth_client_detailsVALUES('myjszl','res1','$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W','asshen','asshen',word',authorization_code,client_credentials,implicit','http://www.baidu.com',NULL,1000,1000,NULL,'false');复制代码,查看MQ中的canal.queue是否已经有数据,如下:其实就是一串JSON数据,JSON如下:{"data":[{"client_id":"myjszl","resource_ids":"res1","client_secret":"$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W","scope":"all","authorized_grant_types":"password,refresh_token,authorization_code,client_credentials,implicit","web_server_redirect_uri":"http://www.baidu.com","authorities":null,"access_token_validity":"1000","refresh_token_validity":"1000","additional_information":null,"autoapprove":"false"}],"database":"test","es":1640337532000,"id":7,“isDdl”:假,“mysqlType”:{“client_id”:“varchar(48)”,“resource_ids”:“varchar(256)”,“client_secret”:“varchar(256)”,“范围”:“varchar(256)","authorized_grant_types":"varchar(256)","web_server_redirect_uri":"varchar(256)","authorities":"varchar(256)","access_token_validity":"int(11)","refresh_token_validity":"int(11)","additional_information":"varchar(4096)","autoapprove":"varchar(256)"},"old":null,"pkNames":["client_id"],"sql":"","sqlType":{"client_id":12,"resource_ids":12,"client_secret":12,"scope":12,“authorized_grant_types”:12,“web_server_redirect_uri”:12,“authorities”:12,“access_token_validity”:4,“refresh_token_validity”:4,“additional_information”:12,“autoapprove”:12},“table”:“oauth_client_details","ts":1640337532520,"type":"INSERT"}复制代码的各个字段的意思很清楚,包括表名,方法,参数,参数类型,参数值.....客户端需要做的就是监听MQ获取JSON数据,然后解析出来处理自己的业务逻辑。Canalclient搭建客户端非常简单。你需要做的是消费Canal服务器传递过来的消息,监听canal.queue队列。1.创建消息实体类。MQ传输JSON数据。当然,需要创建一个接收数据的实体类,如下:@JsonProperty("table")privateStringtable;@JsonProperty("data")privateList
