场景在微服务拆分架构中,每个服务都有自己的数据库,因此经常会遇到服务间数据通信的问题。例如,服务B数据库的数据来自于服务A的数据库;当服务A的数据发生变化时,需要同步到服务B。解决方案1、代码逻辑中,当A服务有数据写操作时,通过调用B服务的方式调用B服务接口接口,B服务将数据写入新数据库。这种方法看似简单,其实“坑”很多。在A业务代码逻辑中,会加入大量调用接口同步的此类代码,增加了项目代码的复杂度,日后维护难度会越来越大。而且调用接口的方式也不是稳定的方式,没有重试机制,没有同步位置记录,接口调用失败如何处理,突然大量调用接口导致的问题等等,所有这些都必须在业务中加以考虑和处理。这里会有很多工作。想到这里,我排除了这个计划。2.通过数据库的binlog同步。该方案独立于A服务,不会与A服务发生代码耦合。直接TCP连接可以传输数据,比接口调用要好。这是一个成熟的生产方案,binlog同步的中间件工具有很多,所以我们关注的是哪个工具能够更好的搭建一个稳定、性能好、易用的高可用部署方案。经过研究,我们选择了canal[https://github.com/alibaba/canal]。canal是阿里巴巴MySQLbinlog的增量订阅消费组件。已经在生产实践中,方便的支持与其他常用的中间件组件组合,比如kafka、elasticsearch等,另外还有一个canal-gogo语言的客户端库,满足了我们在go上的需求。其他细节可以参考canal的github主页。流程示意图1.Canal连接数据库A,模拟slave2.canal-client与Canal建立连接,订阅对应的数据库表3.A数据库发生变化写入binlog,Canal向数据库发送dump请求获取binlog并分析,将解析后的数据发送给canal-client4.canal-client接收数据,并将数据同步到新数据库安装canal下载canal修改配置/conf/canal.properties#...#可选:tcp(默认),kafka,RocketMQcanal.serverMode=kafka#...#kafka/rocketmq集群配置:192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092canal.mq.servers=127.0.0.1:9002canal.mqretries=0#在flagMessage模式下可以增加这个值,但是不要超过MQ消息体大小的上限canal.mq.batchSize=16384canal.mq.maxRequestSize=1048576#请在flatMessage模式下增加这个值,50-200canal是受到推崇的。mq.lingerMs=1canal.mq.bufferMemory=33554432#Canal的batchsize,默认50K,由于kafka最大消息体的限制,请不要超过1M(900K以下)canal.mq.canalBatchSize=50#canal获取数据超时,单位:毫秒,为空为无限超时canal.mq.canalGetTimeout=100#是否为flatjson格式对象canal.mq.flatMessage=falsecanal.mq.compressionType=nonecanal.mq.acks=all#是否kafka消息传递使用事务通道。mq.transaction=false#mqconfigcanal.mq.topic=default#模式或表正则表达式的动态主题路由#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.dynamicTopic=mydatabase.mytablecanal.mq.partition=0#hash分区configcanal.mq.partitionsNum=3#解决消费序列问题canal.mq.partitionHash=mydatabase.mytable然后配置instance,找到/conf/example/instance.properties配置文件:##mysqlserverId,v1.0.26+会autoGen(自动生成,无需配置)#canal.instance.mysql.slaveId=0#位置infocanal.instance.master.address=127.0.0.1:3306#在Mysql中执行SHOWMASTERSTATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596#账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset=UTF-8#MQ队列名canal.mq.topic=canaltopic#单队列模式分区下标canal.mq.partition=0启动zookeeper和kafkazookeeper-server-start.bat../../config/zookeeper.propertieskafka-server-start.bat../../config/server.properties启动canalcanal/bin/start.bat编写读取消息的相关代码kafka相关配置kafka:bootstrap-servers:127.0.0.1:9092producer:#发生错误后,重发消息的次数Numberretries:0#当多条消息需要发送到同一个partition时,producer会将它们放在同一个batch中。此参数指定批处理可以使用的内存大小,以字节为单位计算。batch-size:16384#设置生产者内存缓冲区的大小。buffer-memory:33554432#键序列化方法key-serializer:org.apache.kafka.common.serialization.StringSerializer#值序列化方法value-serializer:org.apache.kafka.common.serialization.StringSerializer#acks=0:生产者在成功写入消息之前不会等待服务器的任何响应。#acks=1:只要集群的leader节点收到消息,producer就会收到server的成功响应。#acks=all:只有所有参与复制的节点都收到了消息,生产者才会收到服务器的成功响应。acks:1consumer:#springboot2.X版本这里使用自动提交的时间间隔。value的类型是Duration,需要符合特定的格式,比如1S,1M,2H,5Dauto-commit-interval:1#这个属性指定了consumer在读分区时应该做什么,没有offset或者aninvalidoffset:#latest(默认值)在offset无效的情况下,消费者会从最新的记录开始读取数据(消费者启动后产生的记录)#earliest:在offset无效的情况下,consumer会从起始位置读取partition的记录auto-offset-reset:earliest#是否自动提交Offset,默认为true,为了避免重复数据和数据丢失,可以设置为false,以及然后手动提交偏移量serialization.StringDeserializer侦听器:#在侦听器容器中运行的线程数。concurrency:5#listner负责ack,每次调用ack时立即commit-mode:manual_immediatemissing-topics-fatal:false@Configuration@EnableKafkapublicclassKafkaConfig{@Value("${spring.kafka.bootstrap-servers}")私有字符串bootstrapServers;@Value("${spring.kafka.producer.retries}")私有字符串重试;@Value("${spring.kafka.producer.batch-size}")privateIntegerbatchSize;@Value("${spring.kafka.producer.buffer-memory}")privateIntegerbufferMemory;/***生产者配置信息*/@BeanpublicMap
