当前位置: 首页 > 科技观察

超大规模系统下,从MySQL同步数据到Redis不是什么难事吧?

时间:2023-03-19 21:45:08 科技观察

1。超大规模系统缓存穿透之痛如何搭建Redis集群?由于集群可以水平扩展,只要集群足够大,理论上支持海量并发不是问题。但是,如果并发请求数的基数太大,即使只有一小部分请求穿透了缓存,直接访问数据库的请求绝对数仍然不小。再加上促销期间的流量高峰,仍然存在缓存穿透导致系统雪崩的风险。那么,如何解决这个问题呢?其实方法也不难想到,就是防止请求穿透缓存即可。如今,内存存储的价格一路走低。只要买得起足够多的服务器,Redis集群的容量是无限的。我们可以把所有的数据都放在Redis集群中。在处理读请求的时候,我们只需要读Redis,不需要访问数据库,完全没有“缓存穿透”的风险。事实上,很多大型互联网公司都在使用这种方法。但是,将全量数据缓存在Redis中会引发新的问题。即缓存中的数据应该如何更新?因为我们取消了缓存穿透机制,在这种情况下,如果可以直接从缓存中读取数据,则可以直接返回,如果读取不到数据,则只能返回错误!因此,当系统更新数据库中的数据时,必须及时更新缓存。至此,我们不得不面对一个老问题:如何保证Redis中的数据和数据库中的数据同步更新?分布式事务可以用来解决数据一致性问题,但是这些方法不适合更新缓存。原因是分布式事务对数据更新服务的侵入性很强。这里仍然以订单服务为例。如果加入分布式事务来更新缓存,无论我们使用哪种分布式事务,都会或多或少地影响到订单服务的性能。还有一个问题就是,如果Redis本身出现故障,写入数据失败,也会导致下单失败,相当于降低了下单服务的性能和可用性,这是绝对不能接受的。对于订单服务等核心业务,一个可行的方法是启动一个更新订单缓存的服务,在订单变化的消息队列(MessageQueue,MQ)中接收消息,然后更新缓存在Redis中的订单数据。订单变更消息更新缓存的结构如图1所示。因为对于这类核心业务数据,通常用户量很大,服务需要对外发送消息。添加消费订阅基本不会增加额外的开发成本,也不需要对订单服务本身做任何改动。改变。图1改变顺序的消息更新缓存通过上面的方式,我们唯一担心的是如果消息丢失了怎么办?因为消息现在是缓存数据的唯一来源,一旦消息丢失,缓存中丢失的数据将永远无法补充。因此,必须保证整个消息链的可靠性。不过好在现在的MQ集群(比如Kafka或者RocketMQ)都有高可用和高可靠的保障机制。只要事先配置得当,就可以满足数据的可靠性要求。和订单服务一样,既然有现成的数据变化消息可以订阅,那么像这样更新缓存也是一个不错的选择,因为这种方式实现起来非常简单,不会侵入系统的其他模块根本。2、使用Binlog实时更新Redis缓存。如果我们要缓存的数据没有订阅数据更新的消息队列,怎么办?下面是很多大型互联网公司采用的比较通用的方案。数据更新服务只负责处理业务逻辑和更新MySQL,不考虑如何更新缓存。负责更新缓存的服务将自己伪装成MySQL从节点。接收并解析来自MySQL的Binlog后,可以得到实时的数据变化信息,然后服务会根据变化信息更新Redis缓存。订阅Binlog更新缓存的结构如图2所示。图2订阅Binlog更新缓存的结构。订阅Binlog更新缓存的方案。与上面接收消息更新Redis缓存的方案相比,两者的实现思路其实是一样的。它们是异步实时订阅数据变化信息来更新Redis缓存的。.不过直接读取Binlog的方法更加通用。这种方式不需要订单服务重新发送订单消息,订单更新服务也不需要考虑如何解决“消息发送失败怎么办?”的数据一致性问题。另外,由于在整个缓存更新链路上减少了一个发送和接收消息队列的环节,从MySQL更新到Redis更新的延迟变得更短,失败的可能性更低。因此,很多大型互联网企业更愿意采用这种方案。订阅binlog更新缓存的方案唯一的缺点是订单缓存更新服务实现起来比较复杂。毕竟这个方案不像接收消息,直接接收的是订单数据,解析Binlog还是挺麻烦的。很多开源项目都提供了订阅和解析MySQLBinlog的功能。下面以比较常用的开源项目Canal为例,演示如何实时接收Binlog,更新Redis缓存。Canal通过模拟MySQL主从复制交互协议,伪装成MySQL从节点,向MySQL主节点发送转储请求。MySQL收到请求后,会开始将Binlog推送到Canal。Canal解析Binlog字节流后,将其转化为易于阅读的结构化数据,供下游程序订阅。图3展示了如何使用Canal订阅Binlog来更新Redis中的订单缓存。图3使用Canal订阅Binlog更新缓存本例中MySQL和Redis都运行在本地默认端口,MySQL的端口为3306,Redis的端口为6379。为了方便大家,账户余额下面使用第5章中提到的表account_balance作为演示数据。首先在本地下载并解压最新版本的Canal1.1.4。运行命令如下:wgethttps://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gztarzvfxcanal.deployer-1.1.4。tar.gz然后配置MySQL,我们需要在MySQL的配置文件中开启Binlog,并设置Binlog的格式为ROW。配置项如下:[mysqld]log-bin=mysql-bin#开启Binlog。binlog-format=ROW#设置Binlog格式为ROW。server_id=1#配置一个ServerID。接下来,为Canal创建一个专门的MySQL用户并授权,保证这个用户有复制Binlog的权限。具体SQL命令如下:CREATEUSERcanalIDENTIFIEDBY'canal';GRANTSELECT,REPLICATIONSLAVE,REPLICATIONCLIENTON*.*TO'canal'@'%';冲洗特权;然后重启MySQL,确保所有配置生效。重启后查看当前Binlog文件及位置。SQL命令及输出结果如下:mysql>showmasterstatus;+------------+--------+-----------+----------------+----------------+|File|Position|Binlog_Do_DB|Binlog_Ignore_DB|Executed_Gtid_Set|+------------+--------+------------+--------------+----------------+|binlog.000009|155||||+-------------+--------+------------+---------------+-----------------+记录File和Position两列的值,然后配置Canal。编辑Canal的实例配置文件canal/conf/example/instance.properties,使Canal可以连接到我们的MySQL。具体配置如下:canal.instance.gtidon=false#positioninfocanal.instance.master.address=127.0.0.1:3306canal.instance.master.journal.name=binlog.000009canal.instance.master.position=155canal。instance.master.timestamp=canal.instance.master.gtid=#用户名/密码canal.instance.dbUsername=canalcanal.instance。dbPassword=canalcanal.instance.connectionCharset=UTF-8canal.instance.defaultDatabaseName=test#tableregexcanal.instance.filter.regex=.*\\..*这个配置文件需要配置MySQL连接地址,库名,用户名和password,另外还需要配置canal.instance.master.journal.name和canal.instance.master.position这两个属性。取值就是刚才记录的两列File和Position。然后就可以启动Canal服务了。命令如下:canal/bin/startup.sh启动后,查看日志文件canal/logs/example/example.log。如果日志中没有报错信息,说明Canal服务已经启动成功,并连接到了我们的MySQL。Canal服务启动后,会打开一个端口(11111)等待客户端连接。客户端连接到Canal服务后,可以从Canal服务中拉取(PULL)数据。每拉取一批数据,都会正确写入Redis,您需要向Canal服务返回一个成功的响应。如果客户端程序宕机,或者出现处理失败等异常情况,Canal服务没有收到成功的响应,那么下次客户端会拉取同一批次的数据,这样可以保证数据的顺序读取的二进制日志不一样。将被搞砸,并且不会丢失任何数据。接下来我们来开发一个账户余额缓存更新程序。以下代码全部用Java语言编写:while(true){Messagemessage=connector.getWithoutAck(batchSize);//获取指定数量的数据。longbatchId=message.getId();尝试{intsize=message.getEntries().size();如果(batchId==-1||大小==0){Thread.sleep(1000);}else{processEntries(message.getEntries(),jedis);}connector.ack(batchId);//提交确认。}catch(Throwablet){connector.rollback(batchId);//处理失败,回滚数据。}}这个程序的逻辑并不复杂。程序启动并连接到Canal服务后,会不断拉取数据。如果没有数据,它会休眠一段时间。如果有数据,它会调用processEntries方法来处理和更新缓存。每批数据更新成功后,会调用ack方法返回成功响应给Canal服务。如果失败,将抛出异常,然后回滚。下面是processEntries方法的主要代码:for(CanalEntry.RowDatarowData:rowChage.getRowDatasList()){if(eventType==CanalEntry.EventType.DELETE){//删除。jedis.del(row2Key("user_id",rowData.getBeforeColumnsList()));}elseif(eventType==CanalEntry.EventType.INSERT){//插入。jedis.set(row2Key("user_id",rowData.getAfterColumnsList()),row2Value(rowData.getAfterColumnsList()));}else{//更新。jedis.set(row2Key("user_id",rowData.getAfterColumnsList()),row2Value(rowData.getAfterColumnsList()));}}上面的代码会根据事件类型进行处理。如果删除了MySQL中的数据,则删除Redis中对应的数据。如果是update和insert操作,调用Redis的SET命令写入数据。下面我们启动账户缓存更新服务进行验证。在账户余额表中插入一条记录。SQL命令如下:mysql>insertintoaccount_balancevalues(888,100,NOW(),999);然后,让我们看一下Redis缓存。运行命令及输出如下:127.0.0.1:6379>get888"{\"log_id\":\"999\",\"balance\":\"100\",\"user_id\":\"888\",\"timestamp\":\""2020-03-0816:18:10\"}"从上面的输出结果我们可以看到数据已经自动同步到Redis了,完整代码本示例的GitHub上可以下载,链接地址为:https://github.com/liyue2008/canal-to-redis-example3.总结在处理超大规模并发场景时,由于大并发请求数,即使只有少量的缓存穿透,也可能导致数据库卡住,造成雪崩效应,这种情况下,我们可以通过Redis缓存全量数据,来完全避免缓存穿透的问题.对于缓存数据的更新方式,我们可以通过订阅消息队列进行数据更新来异步更新缓存。更通用的方法是将缓存更新服务伪装成MySQL从节点,订阅MySQL的Binlog,通过Binlog更新Redis缓存。需要注意的是无论是通过消息队列还是Canal异步更新缓存,系统对整个更新服务的数据可靠性和实时性要求都比较高。数据丢失或者更新慢都会导致Redis中的数据与MySQL中的数据不同步的问题。在将该方案应用到生产环境之前,我们需要考虑在出现不同步问题时应该采用什么样的降级或者补偿方案。作者介绍李跃,美团基础技术部高级技术专家,极客时间《后端存储实战课》《消息队列高手课》等专栏作家。曾就职于当当网、京东零售等公司。多年从事互联网电商行业基础设施领域的架构设计与研发,多次参与双十一、618电商大促。专注于分布式存储、云原生架构下的服务治理、分布式消息和实时计算等技术领域,致力于推动基础设施技术的创新和开源。本文节选自《电商存储系统实战:架构设计与海量数据处理》,经发布者授权发布。