一、背景在上一篇文章中,我们使用CanalAdmin搭建了一个CanalServer集群。在这篇文章中,我们在上一篇文章的基础上,向kafka消息队列发送消息。2、需要修改的地方以下配置文件的修改都是在CanalAdmin上修改的。1.修改canal.properties配置文件1.修改canal.serverMode的值2.修改kafka配置2.修改instance.propertios配置文件3.Canal向mq发送消息性能优化影响性能的几个参数:canal。instance.memory.rawEntry=true(表示是否需要提前序列化,非flatMessage场景需要设置为true)canal.mq.flatMessage=false(false表示二进制协议,true表示使用json格式,二进制协议有更好的性能)canal.mq.dynamicTopic(动态topic配置定义,不同表可设置不同topic,flatMessage方式可提高并行效率)canal.mq.partitionsNum/canal.mq.partitionHash(分区配置,对写入性能有不良影响,但是可以提高消费端的吞吐量)参考链接:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance3.Kafka接收消息1.canal发送的消息/***发送的消息通过canal**@authorhuan.fu2021/9/2-4:06p.m.*/@Getter@Setter@ToStringpublicclassCanalMessage{/***测试表明在同一个东西下进行多次修改,这个id的值是一样的。*/私有整数id;/***数据库或模式*/privateStringdatabase;/***表名*/privateStringtable;/***主键字段名*/privateListpkNames;/***是不是ddl语句*/privateBooleanisDdl;/***类型:INSERT/UPDATE/DELETE*/privateString类型;/***binlogexecuteTime,执行耗时*/privateLonges;/***dml构建时间戳,同步时间*/privateLongts;/***执行的sql,dmlsql为空*/privateStringsql;/***数据列表*/privateList