当前位置: 首页 > 科技观察

MongoDBChangeStreams性能优化实践_0

时间:2023-03-14 15:59:10 科技观察

介绍:基于MongoDB的应用可以通过ChangeStreams功能方便的订阅一个集合、数据库或者整个集群的数据变化,极大的方便了应用对数据库变化的感知,但是目前ChangeStreams没有为一些数据变化提供相应的事件(createindex,deleteindex,shardCollection)。本文介绍了一种新的事件订阅方式来改善上述缺点,并探讨通过并发预读的方式,来提升原生ChangeStreams的性能。一、前言MongoDB作为一款优秀的NOSQL数据库,支持海量存储、丰富的查询能力、卓越的性能和可靠性。目前大部分云厂商都提供兼容MongoDB协议的服务,被国内外用户广泛使用。和企业认可。MongoDB从3.6版本开始提供了ChangeStream特性。通过该特性,应用程序可以实时订阅特定馆藏、图书馆或整个集群的数据变化事件。感知,非常好用,这个特性同时支持副本集和集群场景。ChangeStreams功能目前支持大部分数据操作事件,但对于其他一些操作,如索引创建、索引删除、ColMod、shardCollection不支持,目前ChangeStreams内部实现是通过Aggregate命令完成的。分片集群场景下,mongos节点通过单线程聚合从shard节点拉取并处理oplog。当实例写入压力大时,感知数据的实时变化会出现延迟,需要提高性能,对于ChangeStreams目前的性能问题,官方也进行了讨论https://jira.mongodb.org/browse/SERVER-46979。本文通过对当前ChangeStream实现机制的深入分析,结合客户的实际使用场景,提出了一种新的多并发预读事件监控方法来解决上述问题,并应用于到实际的客户迁移和数据库容灾场景。.2.ChangeStreams机制介绍ChangeStreams支持单个集合、DB、集群的事件订阅。当业务程序通过手表发起订阅时,我们来分析一下幕后发生的事情。ChangeStreams的内部实现是通过Aggregate实现的,所以在watch背后,client向MongoDBServer发起Aggregate命令,在Aggregate的pipeline参数中增加一个$changeStream的stage,结合client其他方面的参数,一起发送到MongoDB服务器。当MongoServer收到Aggregate命令,经过解析后,会根据具体的请求组合出一个新的Aggregate命令,并将该命令发送给对应的Shard节点,同时会在游标管理器(CursorManger)中注册一个新的Cursor(游标),并将游标Id返回给客户端。ShardServer收到Aggregate命令后,构建pipeline管道,根据pipeline参数中包含的ChangeSteams参数,判断扫描到的原始??集合为oplog,并为扫描到的数据创建原始游标和对应的查询计划在集合Executor(PlanExecutor)上,在构建PlanExecutor时,使用了一个特殊的执行阶段,即ProxyStage来完成对整个Pipeline的封装,同时也将对应的cursorID返回给Mongos节点。客户端使用从Mongos节点获取的cursorID,不断对cursor执行getMore请求。服务端收到getMore请求后,最终通过cursor的next调用将请求转发给分片节点。拿到数据后,合并返回客户端可以订阅整个ChangeStreams事件。pipeline在shard上的具体实现细节不在本文讨论范围内,这些就不详细展开了。原生的ChangeStream目前有以下限制:1.对DDL事件的支持不完整。ChangeStream目前支持以下事件:Insert事件Update事件Replace事件Delete事件Drop事件Rename事件DropDatabase事件invalidate事件显然以上事件并不能完全涵盖MongoDB内部所有的数据变化事件。另外,对于collection上监听的ChangeStreams,当某个collection出现或者所属的DB被删除时,会触发invalidateEvent,关闭ChangeStreams的cursor,导致失败ChangeStreams继续。实现容灾显然不够友好,需要重新建立一个新的ChangeStreamslistener。2、事件拉取性能有待提高。上面分析过,当前的ChangeStreams请求发送到Mongos节点后,会以单线程的方式向各个shard节点发送异步请求命令,完成数据的拉取和数据的合并。如果将此方法替换为多线程并发拉取将提高碎片表的性能。3.ParallelChangeStreams架构及原理3.1ParallelChangeStreams架构简介针对上述的一些使用限制,我们根据客户实际需求提出了一种新的并发ChangeStreams(ParallelChangeStreams)方法来尝试解决上述问题。为了提高原生ChangeStreams的性能,我们在Mongos节点中引入了以下新组件:ChangeStreamsBuffer和Shard是一对一的关系。每个ChangeStreamsBuffer默认为1GB。在Buffer满之前,Buffer无条件地从对应的Shard(从节点)中拉取ChangeStreams数据。MergedQueueMergedQueue是一个内存队列,ChangeStreamsBuffer的消费者,Bucket的生产者。MergedQueue将所有分片的ChangeStreamsBuffer合并,等待合适的时机按照规则放入对应的Client'sBucket中。BucketBucket是内存队列,MergedQueue的消费者,Client的生产者。每个Client对应一个Bucket。每个Bucket都维护着Bucket中所有文档的集合。MergedQueue和BucketMergedQueue的交互过程不断的从头部取出尽可能多的数据,按照hash(document.ns)%n,document.ns的规则从前到后放入对应的Bucket中指的是本文档的NameSpace,所以同一个集合的数据必须在一个Bucket中。3.2DDL事件的增强并发ChangeStream除了支持原生的ChangeStream之外,还支持以下事件:CreateCollection事件CollMod事件CreateIndex事件DropIndex事件CreateView事件DropView事件ShardCollection事件本文以ShardCollection为例说明如何实现新的支持DDL事件:Config节点在执行ShardCollection命令时,会向collection的mastershard发送shardsvrShardCollection命令。mastershard收到changerequest后,我们在command的处理流程中记录一个noop类型的oplog,并将command的详细内容写入oplog的o2字段来跟踪shardcollecton事件。之后在处理ChangeStreams流程的pipeline中,我们解析noop事件,如果内容中包含shardCollection事件相关的tag,则提取该事件??返回给上层。3.3使用方法1如果要创建并发变??化的Stream,需要先通过以下命令创建一个bucket和一个cursor:db.runCommand({parallelChangeStream:1,nBuckets:Required,,nsRegex:Optional,,startAtOperationTime:Optional,,})参数说明如下:parallelChangeStream:启用并行changeStreamnBuckets:要创建的桶数nsRegex:可选,定义要订阅的集合,正则表达式。startAtOperationTime:可选,表示订阅事件开始的时间点返回值:"cursors":[NumberLong("2286048776922859088"),NumberLong("2286048779108179584"),NumberLong("2286048780088774662"),NumberLong7("2286048780088774662"),NumberLong7("8774662"),NumberLong7("2286048779108179584"),NumberLong("2286048779233363970"),NumberLong("2286048779250024945"),NumberLong("2286048776628281242"),NumberLong("2286048778209018113"),NumberLong("2286048778833886224"),NumberLong("2286048777951363227")]CursorssideofthereturnedMongosID.客户端获取到所有的CursorID后,可以通过getMore命令并发地(每个CursorId一个线程)不断的从服务端拉取结果。断点续传ParallelChangeStream的断点续传是通过startAtOperationTime实现的。由于每个游标的消耗进度不同,恢复断点应选择n个游标的最小消耗值。4.性能对比对于新的ParallelChangeStream和原生的ChangeStreams,我们做了长期的对比测试和分析。所有测试场景使用的测试实例如下:实例规格:4U16G,2个Shards(副本集),2个Mongos,磁盘容量:500G测试数据模型:通过YCSB预置数据,单条记录1K,单条shardtable1000w条记录。下面介绍几个场景:1.集群模式1分片表场景测试测试方法:1)创建Hash分片集合,预设16个Chunk2)启动YCSB,对集合进行Load数据操作,加载数据量为1000w,并且集合的Oplog足够大,可以保证这些操作还在Oplog中。3)分别启动nativeChangeStreams和ParallelChangeStreams,通过指定startAtOperationTime观察订阅1000w条记录的耗时。4)由于是单表,nBuckets为1,测试数据如下:Totaltimetoreadthetotalamountofdata(ms)TPS(piece/s)ChangeStreams1000w43250123148ParallelChangeStreams(1bucket)1000w184437543612.集群模式2分片表场景测试测试方法:1)创建2组Hash分片,预设16个Chunk2)启动YCSB,同时对这2组进行Load数据操作,每组加载数据量为1000w,设置Oplog足够大,保证这些操作还在Oplog中3)分别启动nativeChangeStreams和ParallelChangeStreams,通过指定startAtOperationTime观察订阅4000w条记录的耗时。4)由于有2张表,nBuckets的测试数据为2如下:Totaltimetoreadthetotaldatavolume(ms)TPS(piece/s)ChangeStreams4000w215179218484ParallelChangeStreams4000w690776552483.集群模式4shardsTable场景测试测试方法:1)创建一组4个Hashshards,预设16个Chunk2)启动YCSB,同时对这4组进行Load数据操作,每组Load数据量为1000w,并且集合的Oplog足够大,保证这些操作还在Oplog中3)分别启动nativeChangeStreams和ParallelChangeStreams,通过指定startAtOperationTime观察订阅4000w条记录的耗时。4)由于有4张表,nBuckets是4条测试数据如下:Totaltimespentreadingthetotalamountofdata(ms)TPS(pieces/s)ChangeStreams4000w215179218596ParallelChangeStreams4000w69077656577总结:通过实际测试一下,可以看到ParallelChangeStreams的性能有了很大的提升。实际上,我们会根据实例规格,通过调整内部Bucket和Buffer的缓存大小,来不断提升性能。随着数量的增加,原生ChangeStreams的性能优势会越来越明显。5.ConcurrentChangeStreams场景分析ConcurrentChangeStreams非常适合MongoDB集群的容灾场景。可以设置应用程序有针对性地监控特定的集合或DB,实时感知源实例中的数据变化,并快速应用到目标端,实现整体低RPO。此外,并发ChangeStreams也可以应用于PITR场景。通过并发ChangeStreams的良好性能,可以实时跟踪和记录动态数据,使得PITR的可恢复时间更短。6.未来展望在目前并行ChangeStreams的实现中,合并队列中的事件分发到桶中的事件中。我们采用的策略是根据事件的NameSpace的HASH值,传给对应的bucket。该策略针对单个采集场景,性能优化有限。未来我们计划同时提供基于事件ID内容的HASH值,将事件分发到不同的桶中。这种方法可以进一步提高系统的并发性能,带来更好的性能优化效果。7.总结通过引入并发ChangeStreams的新方法,支持订阅更多类型的MongoDB事件。同时,事件监控的性能较原来有了很大的提升,可以广泛应用于数据库实例的容灾。PITR在在线数据迁移业务场景中,为客户带来更好的体验。