当前位置: 首页 > 科技观察

我对响应式编程中Mono和Flux的理解

时间:2023-03-19 17:35:05 科技观察

1.前言许多同学反映,他们对响应式编程中的Flux和Mono的概念有些困惑。但是目前在Java响应式编程中,我们接触最多的就是这两个对象,比如SpringWebFlux、RSocket、R2DBC。这两个对象我也开始头疼了,所以今天我们就简单的讨论一下。2、响应流的特点要理解这两个概念,就必须要说说响应流的规范。它是响应式编程的基石。他有以下特点:响应流必须是非阻塞的。响应流必须是数据流。必须可以异步执行。它也应该能够处理背压。背压是反应流中的一个重要概念。可以理解为生产者可以感受到消费者反馈的消费压力,并根据压力动态调整生产速度。形象点可以这样理解:有背压和没有背压两种情况3.Publisher由于响应流的特点,我们不能再返回一个简单的POJO对象来表示结果。有一个类似于Java中Future的概念,当有结果时必须返回通知消费者响应。在ReactiveStream规范中,这被定义为Publisher。发布者是一个提供者,可以根据订阅者的需要提供0-N序列元素和推送元素。一个Publisher可以支持多个订阅者,可以根据订阅者的逻辑推送序列元素。下面的Excel计算可以说明Publisher的一些特点。A1-A9可以看作是Publisher,它提供的元素序列。A10-A13分别是求和函数SUM(A1:A9)、平均函数AVERAGE(A1:A9)、最大值函数MAX(A1:A9)、最小值函数MIN(A1:A9),可以看成是Subscriber。如果我们没有A10-A13,那么A1-A9就没有意义,它们不产生计算。这也是Reactive的一个重要特性:当没有订阅时,发布者什么都不做。Flux和Mono都是由Reactor3中的Publisher实现的。Publisher提供了一个subscribe方法,允许消费者在结果可用时进行消费。如果没有消费者Publisher不会做任何事情,他根据消费情况做出响应。Publisher可能返回零个或多个,甚至可能是无限的。为了更清晰的表达预期结果,引入了Mono和Flux两种实现模型。4、FluxFlux是一个Publisher,它发出(emit)一个0-N个元素的异步序列,可以通过onComplete信号或onError信号终止。在ReactiveStreams规范中,下游消费者调用的方法有onNext、onComplete和onError三种。下图是Flux的抽象模型:上面对Flux的讲解对于刚接触Reactive编程的人来说还是比较难理解的,所以这里是一步一步的理解过程。有些类比不是很贴切,但对你一步步理解这些新概念还是有帮助的。传统的数据处理通常是这样写的:publicListallUsers(){returnArrays.asList(newClientUser("felord.cn","re??active"),newClientUser("Felordcn","Reactor"));}我们通过迭代返回值List获取这些元素进行再加工(消费),这个方法有点像厨师做了很多菜,吃不吃由食客决定。食客主动过来就餐就够了(拉法)。至于他们喜欢吃什么,不喜欢吃什么,他们想怎么吃就怎么吃,怎么吃也是他们的事。在Java8中,流数据处理可以重写为流表示:publicStreamallUsers(){returnStream.of(newClientUser("felord.cn","re??active"),newClientUser("Felordcn","Reactor"));}做菜的还是大厨,不过这家更高级。提供配菜方式(不包括具体细节),食客可根据自己的习惯按照指示进餐。一旦开始,不退款或换货,直到完成,没有有效期。Reactor中的Reactive数据处理可以改写成Flux:publicFluxallUsers(){returnFlux.just(newClientUser("felord.cn","re??active"),newClientUser("Felordcn","Reactor"));}在这一次,食客们只需点菜,做好了就会自然呈现,并可随时根据食客的胃口进行调整。如果没有食客点餐,那么厨师就无事可做。当然,功能不止这些,但足以让我们了解。5、MonoMono是一个发射(emit)0-1个元素的Publisher,可以通过onComplete信号或onError信号终止。这里就不翻译Mono了。整体和Flux类似,只是这里只会发射0-1个元素。也就是说,有或没有。和Flux一样,我们看看Mono的演变过程,有助于理解。传统数据处理publicClientUsercurrentUser(){returnisAuthenticated?newClientUser("felord.cn","re??active"):null;}直接返回合格对象或null。可选的处理方法publicOptionalcurrentUser(){returnisAuthenticated?Optional.of(newClientUser("felord.cn","re??active")):Optional.empty();}我觉得这个Optional有反应型的Smell,当然它不是反应性的。当我们没有从Optional的返回值中得到具体的对象时,不知道里面有没有,但是Optional一定是客观存在的,不会有NPE问题。响应式数据处理publicMonocurrentUser(){returnisAuthenticated?Mono.just(newClientUser("felord.cn","re??active")):Mono.empty();}有点类似于Optional,当然Mono不是为了为了解决NPE问题,它的存在是为了处理响应流中的单个值(可能是Void)。6.总结Flux和Mono是Java响应式编程中的重要概念,但是包括我在内的很多同学一开始都难以理解。这其实规定了两种流范式,让数据有一些新的特性,比如基于发布订阅的事件驱动、异步流、背压等等。另外,将数据推送(Push)给消费者,以区别于我们通常的拉取(Pull)模式。同时,我们可以像StreamApi一样使用map、flatmap这样的操作符来操作它们。理解Flux和Mono这两个概念需要一定的时间,所以不要操之过急。本文转载自微信公众号“码农小胖哥”,可通过以下二维码关注。转载本文请联系码农小胖公众号。