最近在工作中遇到这样一个业务场景。我们需要关注业务系统数据库中某些表的数据。当数据新增或修改时,会同步到另一个业务系统数据库中的表中。桌子。说到数据库同步,估计大家第一个想到的就是基于binlog的主从复制。但是,在我们的场景中,还存在几个问题:第一,没有必要复制所有表的数据。该对象只有几个表。其次,也很麻烦。两个业务系统的数据库表结构可能不一致。例如,将数据库1的表A中的某些字段同步到数据库2的表B中,在这个过程中,表A和表B的字段并不完全相同。在这种情况下,我们只能使用代码。先获取数据库1的表中的数据变化,然后通过手工映射插入到数据库2的表中。但是获取变更数据的过程还是离不开binlog,所以我们需要在代码中对binlog进行监控。先说结论吧。我们最终使用了一个开源工具mysql-binlog-connector-java来监控binlog变化并获取数据。获取到数据后,手动插入到另一个库的表中,并以此为基础实现数据同步。.本工具的git项目地址如下:https://github.com/shyiko/mysql-binlog-connector-java,是用于记录数据库表结构变化和表数据修改的二进制日志。其实除了数据复制,它还可以实现数据恢复、增量备份等功能。在开始项目之前,首先需要确保mysql服务开启了binlog:showvariablelike'log_bin';如果值为OFF,表示没有开启,那么需要先开启binlog,修改配置文件:log_bin=mysql-binbinlog-format=ROWserver-id=1对参数做简单说明:之后在配置文件中加入log_bin配置项,表示开启binlogbinlog-format是binlog的日志格式,支持三种类型,分别是STATEMENT、ROW、MIXED,我们这里使用ROW方式server-id用来识别一条sql语句是从哪个服务器写的。这里必须要设置,否则我们后面的代码就不能正常监听事件了。更改配置文件后,重启mysql服务。再次查看是否开启了binlog,如果返回ON则说明开启成功。在Java项目中,首先引入maven坐标:com.github.shyikomysql-binlog-connector-java0.21.0写一个简单的例子看看它是如何使用的:publicstaticvoidmain(String[]args){BinaryLogClientclient=newBinaryLogClient("127.0.0.1",3306,"hydra","123456");client.setServerId(2);client.registerEventListener(event->{EventDatadata=event.getData();if(datainstanceofTableMapEventData){System.out.println("Table:");TableMapEventDatatableMapEventData=(TableMapEventData)data;System.out.println(tableMapEventData.getTableId()+":["+tableMapEventData.getDatabase()+"-"+tableMapEventData.getTable()+"]");}if(datainstanceofUpdateRowsEventData){System.out.println("Update:");System.out.println(data.toString());}elseif(datainstanceofWriteRowsEventData){System.out.println("Insert:");System.out.println(data.toString());}elseif(datainstanceofDeleteRowsEventData){System.out.println("删除:");System.out.println(data.toString());}});try{client.connect();}catch(IOExceptione){e.printStackTrace();}}首先创建一个BinaryLogClient客户端对象,初始化时需要传入mysql连接信息。创建完成后,为客户端注册一个监听器,实现对binlog的监听和解析,在监听器中,我们暂时只处理四种事件数据。除了增删改操作类型对应的WriteRowsEventData、DeleteRowsEventData、UpdateRowsEventData事件数据外,还有一个TableMapEventData类型的数据,包括表之间的对应关系描述,后面会详细说明在以下示例中。这里客户端监听的是数据库层面的所有事件,可以监听表的DML语句和DDL语句,所以我们只需要处理我们关心的事件数据即可,否则会接收到大量冗余数据。启动程序,控制台输出:com.github.shyiko.mysql.binlog.BinaryLogClientopenChannelToBinaryLogStream信息:Connectedto127.0.0.1:3306atmysql-bin.000002/1046(sid:2,cid:10)信息:Connectedto127.0.0.1:3306atmysql-bin.000002/1046(sid:2,cid:10)连接mysqlbinlog成功。接下来,我们在数据库中插入一条数据。这里操作的数据库名称为tenant,表为dept:insertintodeptVALUES(8,"Manpower","","1");此时控制台会打印出该事件的数据:Table:108:[tenant-dept]Insert:WriteRowsEventData{tableId=108,includedColumns={0,1,2,3},rows=[[8,Manpower,,1]]}我们监控的事件数据有两种类型。第一种是TableMapEventData,通过它可以得到操作的数据库名、表名和表名。ID。之所以要监听这个事件,是因为后面实际操作监控返回的数据中有表的id,但是没有表名等信息,所以如果我们想知道具体操作的是哪个表在,我们首先要维护一个id和一个table的对应关系。打印出来的第二个监听事件数据是WriteRowsEventData,它记录了insert语句作用的表,涉及插入的列,以及实际插入的数据。此外,如果我们只需要处理一个或几个特定的??表,我们也可以预先设置表的列表。这里,我们可以根据表id和表名的映射关系来过滤数据。接下来,我们执行一条更新语句:updateddeptsettenant_id=3whereid=8orid=9控制台输出:Table:108:[tenant-dept]Update:UpdateRowsEventData{tableId=108,includedColumnsBeforeUpdate={0,1,2,3},includedColumns={0,1,2,3},rows=[{before=[8,human,,1],after=[8,human,,3]},{before=[9,human,,1],after=[9,Manpower,,3]}]}执行update语句时,可能作用于多条数据,所以实际修改的数据可能包含多行记录,体现在上面的行中,包括id为8和9的两条数据。最后执行一条删除语句:deletefromdeptwheretenant_id=3控制台打印如下,rows也返回两条有效数据:Table:108:[tenant-dept]Delete:DeleteRowsEventData{tableId=108,includedColumns={0,1,2,3},rows=[[8,manpower,,3],[9,manpower,,3]]}简单的使用原理介绍完之后,再回到我们最初的需求,我们需要添加一个新的将添加或修改的数据同步到另一个表,一个d还有一个问题,就是如何将返回的数据对应到它所在的列。这个时候应该怎么实施呢?以update操作为例,我们需要对提取出来的数据进行处理,更改上面例子中的方法:Map.Entryrow:updateRowsEventData.getRows()){Listentries=Arrays.asList(row.getValue());System.out.println(entries);JSONObjectdataObject=getDataObject(entries);System.out.println(dataObject);}}数据投射到UpdateRowsEventData后,可以使用getRows方法获取更新后的行数据,可以得到每一列的值。之后调用了一个自己实现的getDataObject方法,实现数据到列的绑定过程:privatestaticJSONObjectgetDataObject(Listmessage){JSONObjectresultObject=newJSONObject();Stringformat="{\"id\":\"0\",\"dept_name\":\"1\",\"comment\":\"2\",\"tenant_id\":\"3\"}";JSONObjectjson=JSON.parseObject(format);for(Stringkey:json.keySet()){resultObject.put(key,message.get(json.getInteger(key)));}returnresultObject;}格式字符串中,预先维护好数据库表字段顺序的字符串,标识每个字段在序列中的位置。通过以上函数,可以实现数据到列的填充过程。我们执行一条更新语句查看结果:updateddeptsettenant_id=3,comment="1"whereid=8控制台打印结果如下:Table:108:[tenant-dept]Update:[8,manpower,1,3]{"tenant_id":3,"dept_name":"manpower","comment":"1","id":8}可以看到,将要修改的这条记录中的属性填入其对应的列,然后我们可以根据具体的业务逻辑,按照字段名检索数据,并将数据同步到其他表中。本文转载自微信公众号“码农仓上”,可通过以下二维码关注。转载本文请联系码农公众号。