介绍:基于MongoDB的应用可以通过ChangeStreams功能方便的订阅一个集合、数据库或者整个集群的数据变化,极大的方便了应用对数据库变化的感知,但是目前ChangeStreams没有为一些数据变化提供相应的事件(createindex,deleteindex,shardCollection)。本文介绍了一种新的事件订阅方式来改善上述缺点,并探讨通过并发预读的方式,来提升原生ChangeStreams的性能。一、前言MongoDB作为一款优秀的NOSQL数据库,支持海量存储、丰富的查询能力、卓越的性能和可靠性。目前大部分云厂商都提供兼容MongoDB协议的服务,被国内外用户广泛使用。和企业认可。MongoDB从3.6版本开始提供了ChangeStream特性。通过该特性,应用程序可以实时订阅特定馆藏、图书馆或整个集群的数据变化事件。感知,非常好用,这个特性同时支持副本集和集群场景。ChangeStreams功能目前支持大部分数据操作事件,但对于其他一些操作,如索引创建、索引删除、ColMod、shardCollection不支持,目前ChangeStreams内部实现是通过Aggregate命令完成的。分片集群场景下,mongos节点通过单线程聚合从shard节点拉取并处理oplog。当实例写入压力大时,感知数据的实时变化会出现延迟,需要提高性能,对于ChangeStreams目前的性能问题,官方也进行了讨论https://jira.mongodb.org/browse/SERVER-46979。本文通过对当前ChangeStream实现机制的深入分析,结合客户的实际使用场景,提出了一种新的多并发预读事件监控方法来解决上述问题,并应用于到实际的客户迁移和数据库容灾场景。.2.ChangeStreams机制介绍ChangeStreams支持单个集合、DB、集群的事件订阅。当业务程序通过手表发起订阅时,我们来分析一下幕后发生的事情。ChangeStreams的内部实现是通过Aggregate实现的,所以在watch背后,client向MongoDBServer发起Aggregate命令,在Aggregate的pipeline参数中增加一个$changeStream的stage,结合client其他方面的参数,一起发送到MongoDB服务器。当MongoServer收到Aggregate命令,经过解析后,会根据具体的请求组合出一个新的Aggregate命令,并将该命令发送给对应的Shard节点,同时会在游标管理器(CursorManger)中注册一个新的Cursor(游标),并将游标Id返回给客户端。ShardServer收到Aggregate命令后,构建pipeline管道,根据pipeline参数中包含的ChangeSteams参数,判断扫描到的原始??集合为oplog,并为扫描到的数据创建原始游标和对应的查询计划在集合Executor(PlanExecutor)上,在构建PlanExecutor时,使用了一个特殊的执行阶段,即ProxyStage来完成对整个Pipeline的封装,同时也将对应的cursorID返回给Mongos节点。客户端使用从Mongos节点获取的cursorID,不断对cursor执行getMore请求。服务端收到getMore请求后,最终通过cursor的next调用将请求转发给分片节点。拿到数据后,合并返回客户端可以订阅整个ChangeStreams事件。pipeline在shard上的具体实现细节不在本文讨论范围内,这些就不详细展开了。原生的ChangeStream目前有以下限制:1.对DDL事件的支持不完整。ChangeStream目前支持以下事件:Insert事件Update事件Replace事件Delete事件Drop事件Rename事件DropDatabase事件invalidate事件显然以上事件并不能完全涵盖MongoDB内部所有的数据变化事件。另外,对于collection上监听的ChangeStreams,当某个collection出现或者所属的DB被删除时,会触发invalidateEvent,关闭ChangeStreams的cursor,导致失败ChangeStreams继续。实现容灾显然不够友好,需要重新建立一个新的ChangeStreamslistener。2、事件拉取性能有待提高。上面分析过,当前的ChangeStreams请求发送到Mongos节点后,会以单线程的方式向各个shard节点发送异步请求命令,完成数据的拉取和数据的合并。如果将此方法替换为多线程并发拉取将提高碎片表的性能。3.ParallelChangeStreams架构及原理3.1ParallelChangeStreams架构简介针对上述的一些使用限制,我们根据客户实际需求提出了一种新的并发ChangeStreams(ParallelChangeStreams)方法来尝试解决上述问题。为了提高原生ChangeStreams的性能,我们在Mongos节点中引入了以下新组件:ChangeStreamsBuffer和Shard是一对一的关系。每个ChangeStreamsBuffer默认为1GB。在Buffer满之前,Buffer无条件地从对应的Shard(从节点)中拉取ChangeStreams数据。MergedQueueMergedQueue是一个内存队列,ChangeStreamsBuffer的消费者,Bucket的生产者。MergedQueue将所有分片的ChangeStreamsBuffer合并,等待合适的时机按照规则放入对应的Client'sBucket中。BucketBucket是内存队列,MergedQueue的消费者,Client的生产者。每个Client对应一个Bucket。每个Bucket都维护着Bucket中所有文档的集合。MergedQueue和BucketMergedQueue的交互过程不断的从头部取出尽可能多的数据,按照hash(document.ns)%n,document.ns的规则从前到后放入对应的Bucket中指的是本文档的NameSpace,所以同一个集合的数据必须在一个Bucket中。3.2DDL事件的增强并发ChangeStream除了支持原生的ChangeStream之外,还支持以下事件:CreateCollection事件CollMod事件CreateIndex事件DropIndex事件CreateView事件DropView事件ShardCollection事件本文以ShardCollection为例说明如何实现新的支持DDL事件:Config节点在执行ShardCollection命令时,会向collection的mastershard发送shardsvrShardCollection命令。mastershard收到changerequest后,我们在command的处理流程中记录一个noop类型的oplog,并将command的详细内容写入oplog的o2字段来跟踪shardcollecton事件。之后在处理ChangeStreams流程的pipeline中,我们解析noop事件,如果内容中包含shardCollection事件相关的tag,则提取该事件??返回给上层。3.3使用方法1如果要创建并发变??化的Stream,需要先通过以下命令创建一个bucket和一个cursor:db.runCommand({parallelChangeStream:1,nBuckets:Required,
