当前位置: 首页 > 科技观察

Reactive-MongoDB异步Java驱动解读

时间:2023-03-17 11:17:39 科技观察

一、关于异步驱动从3.0版本开始,MongoDB开始提供异步驱动(JavaAsyncDriver),为应用程序提供了更高性能的选择。但实际上,使用同步驱动(JavaSyncDriver)的项目并不在少数,可能是先入为主的原因(同步驱动的文档比较齐全),也可能是为了兼容MongoDB老版本。无论如何,由于Reactive的发展,使用异步驱动应该是未来的趋势。在使用AsyncDriver之前,需要熟悉Reactive的概念。2、理解Reactive(反应式)Reactive是一种异步的、面向数据流的开发方式。它最早来自.NET平台上的ReactiveExtensions库,后来扩展到各种编程语言的实现。在著名的响应式宣言(ResponsiveManifesto)中,为Reactive定义了四个特性:响应式:系统能够及时响应请求。Resilient:系统在异常发生时仍能响应,即支持容错。弹性:在不同负载下,系统可以弹性伸缩,保证运行。消息驱动:不同组件使用异步消息传递进行交互,保证松散耦合和相互隔离。在ReactiveManifesto定义的系统特性中,都与ReactiveStream有一定的关系,所以就有了2013年发起的ReactiveStreamSpecification。https://www.reactive-streams.org/其中,处理环节响应式流的定义如下:具有处理无限数量元素的能力,即允许流永无止境地处理元素,以实现异步传递非阻塞负压(back-pressure)Java平台在JDK9版本上发布了对ReactiveStreams的支持。下面介绍反应流的几个关键接口:PublisherPublisher是数据的发布者。Publisher接口只有一个方法subscribe,用于添加数据订阅者,即Subscriber。SubscriberSubscriber是数据的订阅者。Subscriber接口有4个方法,所有这些方法都充当不同事件的处理程序。订阅者成功订阅发布者后,将调用其onSubscribe(Subscriptions)方法。订阅表示当前的订阅关系。订阅成功后,可以使用Subscription的request(longn)方法请求发布者发布n条数据。发布者可能会产生3种不同的消息通知,分别对应订阅者的另外3种回调方法。数据通知:对应onNext方法,表示发布者产生的数据。错误通知:对应onError方法,表示发布者产生了错误。结束通知:对应onComplete方法,表示发布者已完成发布所有数据。以上三个通知中,错误通知和结束通知都是终止通知,即终止通知后不会再产生其他通知。SubscriptionSubscription表示一种订阅关系。除了前面提到的request方法,还有一个cancel方法可以取消订阅。需要注意的是,调用cancel方法后,发布者仍可能继续发布通知。但订阅最终会被取消。这些接口之间的关系如下图所示:图片来源:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.htmlMongoDB的异步驱动是mongo-java-driver-reactivestreams组件.它实现了ReactiveStream的上述接口。>除了reactivestream,MongoDB的异步驱动还包括RxJava等风格版本。有兴趣的读者可以进一步了解http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/3.使用示例接下来用一个简单的例子来演示一下Reactive方法的代码风格:A.导入依赖org.mongodbmongodb-driver-reactivestreams1.11.0>importmongodb-driver-reactivestreams会自动添加reactive-streams,bson,mongodb-driver-async组件B.连接数据库//serverinstancetableListservers=newArrayList();servers.add(newServerAddress("localhost",27018));//配置生成器MongoClientSettings.BuildersettingsBuilder=MongoClientSettings.builder();//传入服务器实例settingsBuilder.applyToClusterSettings(builder->builder.hosts(servers));//构建Client实例MongoClientmongoClient=MongoClients.create(settingsBuilder.build());C.实现文档查询//获取数据库对象MongoDatabasedatabase=client.getDatabase(databaseName);//获取集合MongoCollectioncollection=database.getCollection(collectionName);//异步返回PublisherFindPublisherpublisher=collection.find();//订阅实施publisher.subscribe(newSubscriber(){@OverridepublicvoidonSubscribe(Subscriptions){System.out.println("start...");//执行请求s.request(Integer.MAX_VALUE);}@OverridepublicvoidonNext(Documentdocument){//获取文档System.out.println("Document:"+document.toJson());}@OverridepublicvoidonError(Throwablet){System.out.println("erroroccurs.");}@OverridepublicvoidonComplete(){System.out.println("finished.");}});请注意,与使用同步驱动程序不同,collection.find()方法返回的不是Cursor,而是FindPublisher对象,它是Publisher接口的扩展。而且,返回Publisher对象时,并没有GeneraterealdatabaseIOrequests。要真正发起请求,您需要调用Subscription.request()方法。上述代码中,为了读取Publisher产生的结果,通过自定义一个Subscriber,在onSubscribe事件触发时执行数据库请求,然后分别处理onNext、onError、onComplete。这种实现虽然是纯异步的,但是使用起来比较麻烦。试想,如果每次操作数据库都要完成一个Subscriber逻辑,那么开发的工作量是巨大的。为了尽可能复用重复的逻辑,可以对Subscriber的逻辑做一层封装,包括以下功能:使用List容器缓存请求结果,实现阻塞等待结果的方法.您可以指定超时时间来捕获异常并在等待结果时将其抛出。输出代码如下:publicclassObservableSubscriberimplementsSubscriber{//响应数据privatefinalListreceived;//错误信息privatefinalListerrors;//等待对象privatefinalCountDownLatchlatch;//订阅者privatevolatileSubscriptionsubscription;//是否privatevolatilebooleancompleted;publicObservableSubscriberListd()=this.reveerrors=newArrayList();this.latch=newCountDownLatch(1);}@OverridepublicvoidonSubscribe(finalSubscriptions){subscription=s;}@OverridepublicvoidonNext(finalTt){received.add(t);}@OverridepublicvoidonError(finalThrowablet){errors.add(t);onComplete();}@OverridepublicvoidonComplete(){completed=true;latch.countDown();}publicSubscriptiongetSubscription(){returnsubscription;}publicListgetReceived(){returnreceived;}publicThrowablegetError(){if(errors.size()>0){returnerrors.get(0);}returnull;}publicbooleanisCompleted(){returncompleted;}/***阻塞一定时间等待结果**@paramtimeout*@paramunit*@return*@throwsThrowable*/publicListget(finallongtimeout,finalTimeUnitunit)throwsThrowable{returnawait(timeout,unit).getReceived();}/***已经阻塞等待请求完成**@return*@throwsThrowable*/publicObservableSubscribeawait()throwsThrowable{returnawait(Long.MAX_VALUE,TimeUnit.MILLISECONDS);}/***阻塞一定时间等待完成**@paramtimeout*@paramunit*@return*@throwsThrowable*/publicObservableSubscribeawait(finallongtimeout,finalTimeUnitunit)throwsThrowable{subscription.request(Integer.MAX_VALUE);if(!latch.await(timeout,unit)){thrownewMongoTimeoutException("PublisheronCompletetimedout");}if(!errors.isEmpty()){throwerrors.get(0);}returnthis;}}有了这个基础工具类的帮助,我们对文档的异步操作就变得简单多了例如文档查询的操作可以修改如下:>{System.out.println("文档:"+d.toJson());});当然,这个例子还可以进一步改进。例如,如果使用List作为缓存,则必须考虑数据量,避免一次将所有(或超过Amount)文档发送到内存中。作者:唐卓章,华为技术专家,多年互联网研发/安装经验,关注NOSQL中间件的高可用和弹性扩展,对分布式系统架构性能优化有丰富的实践经验。高容量和可用的物联网服务。本文转载自微信公众号“Mongoing中文社区”,可通过以下二维码关注。转载本文请联系Mongoing中文社区公众号。