当前位置: 首页 > 后端技术 > Java

Java代码中如何监控Mysql的binlog?

时间:2023-04-01 18:50:21 Java

最近在工作中遇到这样一个业务场景。我们需要关注业务系统数据库中某些表的数据。当数据新增或修改时,会同步到另一个业务系统数据库中的表中。桌子。说到数据库同步,估计大家第一个想到的就是基于binlog的主从复制。但是,在我们的场景中,还存在几个问题:第一,没有必要复制所有表的数据。该对象只有几个表。其次,也很麻烦。两个业务系统的数据库表结构可能不一致。例如,将数据库1的表A中的某些字段同步到数据库2的表B中,在这个过程中,表A和表B的字段并不完全相同。在这种情况下,我们只能使用代码。先获取数据库1的表中的数据变化,然后通过手工映射插入到数据库2的表中。但是获取变更数据的过程还是离不开binlog,所以我们需要在代码中对binlog进行监控。先说结论吧。我们最终使用了一个开源工具mysql-binlog-connector-java来监控binlog变化并获取数据。获取数据后,手动插入到另一个库的表中。以此为基础,实现了数据表。同步。项目的git地址如下:https://github.com/shyiko/mysql-binlog-connector-java用于记录数据库表结构变化和表数据修改的二进制日志。其实除了数据复制,它还可以实现数据恢复、增量备份等功能。在开始项目之前,首先需要确保mysql服务开启了binlog:showvariableslike'log_bin';如果值为OFF,表示没有开启,那么需要先开启binlog,修改配置文件:log_bin=mysql-binbinlog-format=ROWserver-id=1对参数做简单说明:after在配置文件中加入log_bin配置项,表示开启binlogbinlog-format是binlog的日志格式,支持三种类型,分别是STATEMENT、ROW、MIXED,我们这里使用ROW方式的server-id来标识一条sql语句是从哪个服务器写的。这里必须设置,否则我们在后面的代码中将无法正常监听事件。更改配置文件后,重启mysql服务。再次查看是否开启了binlog,如果返回ON则说明开启成功。在Java项目中,首先引入maven坐标:com.github.shyikomysql-binlog-connector-java0.21.0写一个简单的例子看看它是如何使用的:publicstaticvoidmain(String[]args){BinaryLogClientclient=newBinaryLogClient("127.0.0.1",3306,"hydra","123456");client.setServerId(2);client.registerEventListener(event->{EventDatadata=event.getData();if(datainstanceofTableMapEventData){System.out.println("Table:");TableMapEventDatatableMapEventData=(TableMapEventData)data;System.out.println(tableMapEventData.getTableId()+":["+tableMapEventData.getDatabase()+"-"+tableMapEventData.getTable()+"]");}if(datainstanceofUpdateRowsEventData){System.out.println("更新:");System.out.println(data.toString());}elseif(数据输入instanceofWriteRowsEventData){System.out.println("插入:");System.out.println(data.toString());}elseif(datainstanceofDeleteRowsEventData){System.out.println("Delete:");System.out.println(data.toString());}});尝试{客户端连接();}catch(IOExceptione){e.printStackTrace();}}首先,创建一个BinaryLogClient客户端对象,初始化时需要传入mysql连接信息,创建完成后,为客户端注册一个监听器,实现其对binlog的监听和解析。在监听器中,我们暂时只处理4类事件数据,除了WriteRowsEventData、DeleteRowsEventData、UpdateRowsEventData对应增删改查类型的事件数据外,还有一类TableMapEventData数据,其中包含了表之间的对应关系,在后面的例子中会详细说明。这里客户端监听的是数据库层面的所有事件,可以监听表的DML语句和DDL语句,所以我们只需要处理我们关心的事件数据即可,否则会接收到大量冗余数据。启动程序,控制台输出:com.github.shyiko.mysql.binlog.BinaryLogClientopenChannelToBinaryLogStream信息:Connectedto127.0.0.1:3306atmysql-bin.000002/1046(sid:2,cid:10)binlog连接到mysql成功了,接下来我们在数据库中插入一条数据,这里操作的数据库名称为tenant,表为dept:insertintodeptVALUES(8,"Manpower","","1");此时控制台会打印事件监听的数据:Table:108:[tenant-dept]Insert:WriteRowsEventData{tableId=108,includedColumns={0,1,2,3},rows=[[8,manpower,,1]]}我们监控的事件数据有两种。第一种是TableMapEventData,通过它可以得到操作的数据库名、表名、表id。之所以要监听这个事件,是因为后面实际操作监控返回的数据中有表的id,但是没有表名等信息,所以如果我们想知道具体操作的是哪个表在,我们首先要维护一个id和一个table的对应关系。打印出来的第二个监听事件数据是WriteRowsEventData,它记录了insert语句作用的表,涉及插入的列,以及实际插入的数据。此外,如果我们只需要处理一个或几个特定的??表,我们也可以预先设置表的列表。这里,我们可以根据表id和表名的映射关系来过滤数据。接下来,我们执行一条更新语句:updatedeptsettenant_id=3whereid=8orid=9控制台输出:Table:108:[tenant-dept]Update:UpdateRowsEventData{tableId=108,includedColumnsBeforeUpdate={0,1,2,3},includedColumns={0,1,2,3},rows=[{before=[8,manpower,,1],after=[8,manpower,,3]},{before=[9,manpower,,1],after=[9,manpower,,3]}]}执行update语句时,可能会影响到多条数据,所以实际修改的数据可能包含多行记录,体现在上面rows,包含两条id分别为8和9的数据,最后执行删除语句:deletefromdeptwheretenant_id=3控制台打印如下,rows也返回两条有效数据:Table:108:[tenant-dept]Delete:DeleteRowsEventData{tableId=108,includedColumns={0,1,2,3},rows=[[8,manpower,,3],[9,manpower,,3]]}简单介绍后使用原理说完了,我们回到我们最初的需求,我们需要将一张表中新增或修改的数据同步到另一张表中,还有一个问题,就是如何将返回的数据对应到它所在的列。这个时候应该怎么实现呢?以update操作为例,我们需要对提取出来的数据进行处理,更改上例中的方法:UpdateRowsEventDataupdateRowsEventData=(UpdateRowsEventData)数据;对于(Map.Entryrow:updateRowsEventData.getRows()){Listentries=Arrays.asList(row.getValue());System.out.println(条目);JSONObjectdataObject=getDataObject(条目);System.out.println(dataObject);}}数据类型转换为UpdateRowsEventData后,可以使用getRows方法获取更新后的行数据,可以获取每一列的值。之后调用一个自己实现的getDataObject方法实现数据到列的绑定过程:privatestaticJSONObjectgetDataObject(Listmessage){JSONObjectresultObject=newJSONObject();Stringformat="{\"id\":\"0\",\"dept_name\":\"1\",\"comment\":\"2\",\"tenant_id\":\"3\"}";JSONObjectjson=JSON.解析对象(格式);for(Stringkey:json.keySet()){resultObject.put(key,message.get(json.getInteger(key)));}returnresultObject;}在格式字符串中,预先维护了一个数据库表字段顺序的字符串,标识了每个字段在顺序中的位置。通过以上函数,可以实现数据到列的填充过程。我们执行一条update语句查看结果:updatedeptsettenant_id=3,comment="1"whereid=8控制台打印结果如下:Table:108:[tenant-dept]Update:[8,manpower,1,3]{"tenant_id":3,"dept_name":"manpower","comment":"1","id":8}可以看到,将修改记录中的属性填入对应的列中,并然后根据具体的业务逻辑,我们可以根据字段名取出数据,同步到其他表中。如果文章对你有帮助,请关注公众号