当前位置: 首页 > 科技观察

你,缓存,一致性?

时间:2023-03-18 14:17:27 科技观察

大家好,我是北军。不知道大家出去面试有没有被问到如何保证数据库和缓存的一致性?大家是怎么回答的?CacheConsistency每年逢年过节都很难抢到票。放票的时候,那么多人同时去抢票。如果每个人都访问数据库进行查询和购票,那么数据库的压力有多大?这时候很多都会引入缓存,把票信息放到缓存中,这样可以减轻数据库的压力。旅客购票成功后,数据库发生了变化,需要及时更新缓存中的数据,以便其他旅客及时从缓存中获取最新的车票信息。这就是缓存一致性。解决数据库和缓存一致性的主要思路:1.同步双写:即在修改db的同时修改缓存。这种模式无法保证数据库和缓存的原子性。如果多个线程同时修改db,网络延迟会导致数据库修改顺序与请求顺序错位。例如:A先操作数据库修改x=1B也修改数据库x=2,但是网络延迟导致B先修改缓存x=2然后A修改缓存x=1会导致x=数据库中为2,缓存中x=1,导致数据库和缓存不一致。2、设置有效期:为缓存设置有效期,过期自动删除。再次查询时更新优点:简单方便缺点:时效性差,缓存过期前可能不一致场景:更新频率低,业务对时效性要求不高那么我们有没有更好的解决方案呢?阿里云的canal很好的为我们解决了这个问题:canal:是阿里巴巴旗下的开源项目,纯Java开发。基于数据库增量日志分析,提供增量数据订阅消费,目前主要支持mysql。Canal工作原理MySQL的主从复制原理:MySQLmaster将数据变化写入二进制日志(binarylog,这里记录的记录称为binarylogevents,可以通过showbinlogevents查看)MySQLslave复制master到它的中继日志(relaylog)MySQLslave重放中继日志中的事件,并将数据变化反映到自己的datacanal工作原理canal模拟mysqlsalve的交互协议,伪装成mysqlslave,发送dump协议到mysqlmaster;mysqlmaster收到dump请求,开始向slave(即canal)推送二进制日志;canal解析二进制日志对象(原始字节流)。canal的安装配置(以windows为例)1、登录Mysql后,使用showvariableslike'log_bin';查询是否开启了binlog,如果开启(ON),则进行下一步,如果没有开启(OFF),在数据库的my.ini配置文件中添加配置[mysqld]#enablebinloglog-bin=mysql-bin#选择ROW模式binlog-format=ROW#配置MySQL复制需要定义,不要重复server_id=1和canalslaveId2.启用binlog后,创建canal用户并授权。官网配置是@%,表示所有服务器,可以改成localhost。在mysql中运行如下代码,设置完成后重启:CREATEUSERcanalIDENTIFIEDBY'canal';GRANTSELECT,REPLICATIONSLAVE,REPLICATIONCLIENTON*.*TO'canal'@'localhost'identifiedby'canal';冲洗特权;3.安装canal下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.6-alpha-1在conf文件夹中找到\conf\canal.propertiescanal.id=1canal.ip=canal.port=11111canal.metrics.pull.port=11112canal.zkServers=#flushdatatozkcanal.zookeeper.flush.period=1000canal.withoutNetty=false#tcp,kafka,RocketMQcanal.serverMode=tcp#flushmetacursor/parsepositiontofile描述:这个文件是canalBasicgeneral配置,canal端口号默认为11111,修改canal的输出模型,默认tcp,改为输出到kafka,如果需要同步数据到kafka,或者rocketmq,可以单独修改,这里不要修改,解压到合适的位置。解压后在conf文件夹中找到\example\instance.properties、canal.instance.mysql。slaveId=20#只要和mysqlmaster不同#enablegtidusetrue/falsecanal.instance.gtidon=false#positioninfocanal.instance.master.address=127.0.0.1:3306canal.instance.mysql.slaveId=20#只要不同于mysqlmastercanal.instance.master.address=127.0.0.1:3306,监控mysqlmaster节点信息配置连接MySQL的用户名和密码,默认是canal修改我们之前授权的数据库配置信息,canal.instance.dbUsername,canal.instance.dbPassword是数据库账号密码,两者都是canal。刚刚创建了账号密码,启动了bin目录下的startup.bat。出现如下界面,表示启动成功。4.在springboot中集成canalmaven依赖com.alibaba.ottercanal.client1.1.4java例子:publicclassCanalService{publicstaticvoidmain(String[]args)throwsException{//1.获取运河连接对象,我部署在本机上,所以是127.0.0.1CanalConnectorcanalConnector=CanalConnectors.newSingleConnector(newInetSocketAddress("127.0.0.1",11111),"例子","","");System.out.println("canal启动并开始监听数据...");while(true){canalConnector.connect();//订阅test数据库下的所有表canalConnector.subscribe("test.*");//获取数据Messagemessage=canalConnector.get(100);//解析消息Listentries=message.getEntries();if(entries.size()<=0){System.out.println("没有检测到数据");线程.睡眠(1000);}for(CanalEntry.Entryentry:entries){//1.获取表名StringtableName=entry.getHeader().getTableName();//2。获取类型CanalEntry.EntryTypeentryType=entry.getEntryType();//3。获取序列化数据B??yteStringstoreValue=entry.getStoreValue();//判断是否为rowdata类型数据if(CanalEntry.EntryType.ROWDATA.equals(entryType)){//解析第三步中的数据//获取当前事件的操作类型CanalEntry.EventTypeeventType=rowChange.getEventType();//获取数据集ListrowDatasList=rowChange.getRowDatasList();//便民数据for(CanalEntry.RowDatarowData:rowDatasList){//数据变化前的内容JSONObjectbeforeData=newJSONObject();ListbeforeColumnsList=rowData.getAfterColumnsList();for(CanalEntry.Columncolumn:beforeColumnsList){beforeData.put(column.getName(),column.getValue());}//数据变化之后的内容ListafterColumnsList=rowData.getAfterColumnsList();JSONObjectafterData=newJSONObject();for(CanalEntry.Columncolumn:afterColumnsList){afterData.put(column.getName(),column.getValue());}System.out.println("表格:"+tableName+",eventType:"+eventType+",beforeData:"+beforeData+",afterData:"+afterData);//操作保存}}else{System.out.println("当前操作类型为:"+entryType);}}}}}我手动操作book表中的数据,可以看到程序监控输出结果。5.最后我们可以把获取到的数据放入消息队列,这样可以增加重试机制,也可以防止幂等问题。最后写入缓存的消息队列,保证可靠性:写入队列的消息在消费成功前不会丢失(重启项目不用担心)。消息队列保证消息的成功投递:下游从队列中拉取消息,消费成功后删除消息,否则继续投递消息给消费者(符合我们的重试场景)。