当前位置: 首页 > 科技观察

MongoDBStream如何实现完美的数据增量迁移?

时间:2023-03-20 14:04:34 科技观察

1。背景介绍最近微服务架构很火,但本质上只是风口浪尖的热词。根据笔者的经验,应用一个新的架构所需要的变更成本是非常高的。尽管如此,还是有不少企业走上了服务化转型的道路,各种“旧改”的繁琐事情在所难免。所谓“旧改”,就是将现有的系统架构进行一次重构,拆分成多个细粒度的服务,然后抽空升级切入,让新系统上线。其中,数据迁移往往成为一项非常重要和复杂的工作。拆分服务时数据迁移的挑战在哪里?首先,它很难。制定迁移计划,需要了解项目前身,评估迁移计划,技术工具等;其次,成本高。由于新旧系统的数据结构不同,需要定制开发迁移转换功能。一键迁移很难有通用的工具;而且对于一些容量大、可靠性要求高的系统,还需要能够不影响业务。问题是可以追溯的,所以计划要复杂一些。二、常见的解决方案根据迁移方案和流程,数据迁移可以分为三类:1、最简单的宕机迁移方案。停机迁移的顺序如下:使用停机迁移的好处是工艺操作简单,工具成本低。但是,缺点也很明显。迁移过程中无法访问业务,只适用于规格不多,允许关闭服务的场景。2、业务双写业务双写是指对现有系统进行改造升级,支持新旧数据库同时写入。然后使用数据迁移工具对旧数据进行全量迁移,全部数据迁移转换完成后切换到新系统。示意图:业务双写方案流畅,对线上业务影响极小。出现问题可以重新启动,运行压力会比较小。笔者几年前尝试过这样的方案,整个迁移过程确实很顺利,但是这个方案的实现比较复杂,需要修改现有代码,完成新数据的转换和写入.对开发人员的要求比较高。高的。适用于业务逻辑清晰,团队对系统有足够控制权的场景。3、增量迁移增量迁移的基本思路是先进行全量迁移转换,完成后继续处理增量数据,直到数据均衡再切换系统。示意图:要点:要求系统支持增量数据的记录。对于MongoDB,您可以使用oplog来实现这一点。为了避免oplog在全量迁移过程中被flush,必须在开始迁移前就开始监控oplog,记录所有变化;如果没有办法,就需要从应用层去考虑,比如所有的表(集合)记录updateTime等时间戳,或者升级应用,支持单独记录修改操作。增量数据的回放是连续的。在所有增量数据的回放和转换过程中,系统仍然会产生新的增量数据,这就需要迁移工具能够不断地回放增量数据,并在进行系统切换之前对其进行均衡。MongoDB从3.6版本开始提供ChangeStream功能,支持监控数据变化记录。这为数据同步和转换处理提供了更大的便利。下面将讨论如何使用ChangeStream实现增量数据迁移。3.ChangeStream简介ChangStream(变更记录流)是指集合(数据库集合)的变更事件流。应用程序可以通过db.collection.watch()等命令获取被监控对象的实时变化。在此功能出现之前,您可以通过拉取oplog来达到相同的目的;但是oplog的处理解析比较复杂,有被回滚的风险。如果使用不当,也会造成性能问题。ChangeStream可以与聚合框架结合使用,以进一步过滤或转换变更集。参考链接:https://docs.mongodb.com/manual/aggregation/由于ChangeStream利用了oplog中存储的信息,单进程部署的MongoDB无法支持ChangeStream功能,只能用于启用replicasA独立或分片的集合集群。一个要监控的ChangeStreamEvent的基本结构如下:字段说明:ChangeSteram支持的变更类型如下:使用下面的shell脚本打印出集合T_USER上的变更事件:下面是一些示例For例子,感受一下:inserteventupdateeventreplaceeventdeleteeventinvalidateevent更多ChangeEvent信息可以参考:https://docs.mongodb.com/manual/reference/change-events/4.这次实现增量迁移一个简单的论坛帖子迁移示例旨在演示如何使用ChangeStream实现最佳的增量迁移解决方案。背景如下:现有系统中有一批帖子,每个帖子属于一个频道(channel),如下表所示:新系统中的频道字段将使用英文缩写,为需要支持平滑升级。根据上一节,我们将使用ChangeStream功能来实现一个增量迁移的方案。相关表转换如下图:原理topic为原posts表,迁移开始前会启动watch任务,不断获取增量数据记录在topic_incr表中;然后进行全量迁移和转换,然后继续更新增量表数据Migrate,直到没有新的增量。接下来,我们使用Java程序来完成相关代码。mongodb-java--driver3.6版本以后才支持watch功能。需要保证升级到对应的版本:定义Channel频道的转换表:publicstaticenumChannel{Food("food"),Emotion("Emotion"),Pet("Pet"),House("Home"),Marriage("Marriage"),Education("Education"),Travel("Travel");privatefinalStringoldName;publicStringgetOldName(){returnoldName;}privateChannel(StringoldName){this.oldName=oldName;}/***转换为a新名称**@paramoldName*@return*/publicstaticStringtoNewName(StringoldName){for(Channelchannel:values()){if(channel.oldName.equalsIgnoreCase(oldName)){returnchannel.name();}}return"";}/***返回一个随机通道**@return*/publicstaticChannelrandom(){Channel[]channels=values();intidx=(int)(Math.random()*channels.length);returnchannels[idx];}}为主题表预写1w条记录:启动监控任务,将主题的所有变化写入增量表:代码中通过watch命令获取一个MongoCursor对象,用于遍历所有变化。开启FullDocument.UPDATE_LOOKUP选项后,updatechange事件中会携带完整的文档数据(FullDocument)。watch()命令提交后,mongos会与shard上的mongod(master节点)建立订阅通道,这个过程可能需要一段时间。为了模拟线上业务的真实情况,开启了多个线程不断的写入topic表:ChangeTask实现逻辑如下:每个changetask都会不断的对topic产生写操作,触发一系列ChangeEvent的产生:doInsert:生成随机频道话题后,执行insert;doUpdate:随机获取一个topic,将其channel字段改为随机值,执行update;doReplace:随机获取一个topic,将其channel字段改为随机值,执行replace;doDelete:随机获取一个topic,执行delete。以doUpdate为例,实现代码如下:启动全量迁移任务,将topic表中的数据迁移到新的topic_new表中:全量迁移开始前,先获取当前的***_id值时刻(可以记录这个值)作为终点,然后一一完成迁移改造。全量迁移完成后,开始最后一步:增量迁移。注意:在增量迁移过程中,变更操作仍在进行中。finalMongoCollectiontopicIncrCollection=getCollection(coll_topic_incr);finalMongoCollectiontopicNewCollection=getCollection(coll_topic_new);ObjectIdcurrentId=null;Documentsort=newDocument("_id",1);MongoCursorcursor=null;//批量大小小=100;AtomicIntegercount=newAtomicInteger(0);try{while(true){booleanisWatchTaskStillRunning=watchFlag.getCount()>0;//按ID增量分段取if(currentId==null){cursor=topicIncrCollection.find().sort(sort).limit(batchSize).iterator();}else{cursor=topicIncrCollection.find(newDocument("_id",newDocument("$gt",currentId))).sort(sort).limit(batchSize).iterator();}booleanhasIncrRecord=false;while(cursor.hasNext()){hasIncrRecord=true;DocumentincrDoc=cursor.next();OperationTypeopType=OperationType.fromString(incrDoc.getString(field_op));ObjectIddocId=incrDoc.getObjectId(field_key);//记录当前IDcurrentId=incrDoc.getObjectId("_id");if(opType==OperationType.DELETE){topicNewCollection.deleteOne(newDocument("_id",docId));}else{Documentdoc=incrDoc.get(field_data,Document.class);//通道转换StringoldChannel=doc.getString(field_channel);doc.put(field_channel,Channel.toNewName(oldChannel));//启用upsertUpdateOptionsoptions=newUpdateOptions().upsert(true);topicNewCollection.replaceOne(newDocument("_id",docId),incrDoc.get(field_data,Document.class),options);}if(count.incrementAndGet()%10==0){logger.info("IncrTransferTaskprogress,count:{}",count.get());}}//手表停止工作时(不再change),没有要处理的记录,跳出迁移的实现是一个连续尾的过程,利用**_id字段**的有序特性进行分段迁移;即记录当前处理的_id值,并循环拉取_id值后的记录进行处理和添加除了DELETE变化的尺度(topic_incr),其余类型保留整个文档,所以可以直接使用replace+upsert附加到新表。***,运行整个程序。查看topic表和topic_new表,发现它们的个数是一样的。为了进一步确认一致性,我们对两张表做一个聚合统计:topic表topic_new表前者输出结果:后者输出结果:前后对比结果一致。5.后续优化上一章演示了一个增量迁移的例子。这些代码在投入上线运行之前还需要进一步优化:写入性能。在线数据量可能达到数十亿。增量迁移时应采用合理的批处理;另外,通过增加并发线程,增加更多的Worker,可以将不同的业务库和表分开处理,提高效率。增量表是幂等的,即多次回放最终结果还是一样的,但是需要保证表层级有序,即只有一个线程在对一张表进行增量回放同时。容错性,watch监控任务一旦异常,必须能够从更早的时间点开始(使用startAtOperationTime参数),如果写入失败,必须支持重试。回溯能力,做必要的跟踪记录,比如记录转换失败的ID号,旧系统的数据需要保留,以免事后排查某个数据问题时丢失。数据转换,新旧业务的区别不会很简单,通常需要借助大量的转换表才能完成。对于一致性检查,需要根据业务特点开发自己的一致性检查工具,证明迁移后的数据达到了想要的一致性级别。BTW,数据迁移一定要结合业务特点和架构差异来考虑,不然你还是耍流氓。6.总结在基于服务的系统中,扩容和升级往往需要数据迁移。对于业务量大的系统和对中断敏感的系统,通常采用平滑迁移的方式。MongoDB3.6版本后,提供ChangeStream功能,支持应用订阅数据的变更事件流。本文使用Stream函数实现增量平滑迁移示例。这是一种尝试,相信以后这样的应用场景会越来越多。附参考文档100亿数据平滑数据迁移,不影响服务2017/11/22/mongodb-3-6-change-streams-nest-temperature-fan-control-use-case/