1。BackpressureBackpressure在国内翻译为背压。这个翻译在网上被很多人吐槽。我觉得大家的抱怨都是有道理的。什么意思。所以宋哥这里直接用英文Backpressure。背压是一种现象:当数据流从上游生产者传输到下游消费者时,上游生产速度大于下游消费速度,导致下游Buffer溢出。这种现象称为背压。也就是说,上游生产数据,生产完成后,通过管道将数据传输给下游,下游消费数据。当下游消费速度低于上游数据生产速度时,管道中数据的积压会对上游形成压力,这就是Backpressure。从这个角度看,将Backpressure翻译成backpressure和backpressure似乎更合理。Backpressure会出现在有Buffer上限的系统中。当Buffer溢出时,就会有Backpressure。对于Backpressure,只有一种对策:丢弃新事件。那么什么是缓冲区溢出呢?比如我的服务器可以同时处理2000个用户请求,所以我设置请求上限为2000,这个2000就是我的Buffer。当超过2000时,会产生Backpressure。2.FlowAPI在JDK9中引入了FlowAPI,用于支持ReactiveProgramming,即响应式编程。在响应式编程中,会有数据发布者Publisher和数据订阅者Subscriber。订阅者接收并消费发布者发布的数据。在Subscriber和Publisher之间还有一个Processor,类似于过滤器,可以对数据进行处理。中间处理。JDK9提供了FlowAPI来支持响应式编程,RxJava、Reactor等框架也提供了相关的实现。再来看看JDK9中的Flow类:非常简洁,基本是按照ReactiveProgramming设计的:PublisherPublisher是一个数据发布者,是一个函数式接口,里面只有一个方法,通过它发布数据。Publisher的定义如下:@FunctionalInterfacepublicstaticinterfacePublisher{publicvoidsubscribe(Subscribersubscriber);}SubscriberSubscriber是一个数据订阅者,里面有四个方法,如下:publicstaticinterfaceSubscriber{publicvoidonSubscribe(Subscriptionsubscription);publicvoidonNext(Tite);publicvoidonError(Throwablethrowable);publicvoidonComplete();}onSubscribe:订阅成功的回调方法,用于初始化Subscription,表示可以接收到订阅数据。onNext:接收下一个订阅数据的回调方法。onError:当Publisher或Subscriber遇到不可恢复的错误时调用该方法,之后Subscription将不会再调用Subscriber的其他方法。onComplete:当接收到所有订阅数据并且关闭发布者时会回调该方法。SubscriptionSubscription是发布者和订阅者之间的订阅关系,用来控制消息的消费。这里面有两个方法:publicstaticinterfaceSubscription{publicvoidrequest(longn);publicvoidcancel();}request:这个方法用来向publisherndata请求数据。cancel:取消消息订阅,订阅者将不再接收数据。ProcessorProcessor是一个空接口,但是它同时继承了Publisher和Subscriber,所以可以发布数据和订阅数据,所以我们可以使用Processor来完成一些数据转换的功能,先接收数据进行处理,然后发送数据released,这也有点类似于我们JavaEE中的filter。publicstaticinterfaceProcessorextendsSubscriber,Publisher{}2.1消息订阅初体验下面通过下面一段代码来体验一下消息的订阅和发布:publicclassFlowDemo{publicstaticvoidmain(String[]args){SubmissionPublisherpublisher=newSubmissionPublisher<>();Flow.Subscribersubscriber=newFlow.Subscriber(){privateFlow.Subscriptionsubscription;@OverridepublicvoidonSubscribe(Flow.Subscriptionsubscription){this.subscription=subscription;//请求一个数据publisherDatathis.subscription.request(1);}@OverridepublicvoidonNext(Stringitem){System.out.println("收到了publisher的消息:"+item);//收到后可以继续接收或不接收//this.subscription.cancel();this.subscription.request(1);}@OverridepublicvoidonError(Throwablethrowable){//如果有异常就会来到这个方法,此时可以直接取消订阅this.subscription.cancel();}@OverridepublicvoidonComplete(){//接收到发布者的所有数据,发布者关闭System.out.println("Datareceived");}};//配置发布者和订阅者publisher.subscribe(subscriber);for(inti=0;i<5;i++){//发送数据publisher.submit("hello:"+i);}//关闭发布者publisher.close();newScanner(System.in).next();}}宋兄对上面的代码稍微解释一下:首先创建一个SubmissionPublisher对象作为消息发布者,创建一个Flow.Subscriber对象作为消息订阅者,在消息订阅者中实现四个方法,分别进行处理。为发布者配置订阅者。发送一个消息。发送消息后关闭发布者。最后就是让程序不停的观察消息订阅者的打印。2.2模拟BackpressureBackpressure问题在FlowAPI中得到了很好的解决。Subscriber会将Publisher发布的数据缓存在Subscription中,其长度默认为256。相关源码如下:publicfinalclassFlow{staticfinalintDEFAULT_BUFFER_SIZE=256;publicstaticintdefaultBufferSize(){returnDEFAULT_BUFFER_SIZE;}...}一旦超过这个数据量,publisher就会降低数据发送速度。我们将上面的案例修改如下:;@OverridepublicvoidonSubscribe(Flow.Subscriptionsubscription){this.subscription=subscription;//向数据发布者请求一个数据this.subscription.request(1);}@OverridepublicvoidonNext(Stringitem){System.out.println("接收消息frompublisherishere:"+item);//接收后,可以继续接收//this.subscription.cancel();try{Thread.sleep(2000);}catch(InterruptedExceptione){e.printStackTrace();}this.subscription.request(1);}@OverridepublicvoidonError(Throwablethrowable){//如果出现异常,就会来到这个方法,此时可以直接取消订阅this.subscription.cancel();}@OverridepublicvoidonComplete(){//接收到发布者的所有数据,发布者关闭System.out.println("Datareceived");}};publisher.subscribe(subscriber);for(inti=0;i<500;i++){System.out.println("i-------->"+i);publisher.submit("hello:"+i);}//关闭发布者publisher.close();newScanner(System.in).next();}}一共修改了三个地方:Subscriber#onNext方法,每个time在处理下一条数据之前休息两秒。发布数据时,一共发布500条数据。打印数据发布日志。修改完成后,我们再次启动项目,观察控制台输出:可以看到生产者首先生产了257条数据(一开始消费的是hello0,所以缓存中实际有256条),而message是一条一条的,因为消费速度比较慢,所以当缓存中的数据超过256条的时候,下一步就是消费一条,发送一条。2.3数据处理Flow.Processor可以像过滤器一样对数据进行预处理。数据从发布者出来后,首先进入Flow.Processor进行预处理,然后进入订阅者。修改后的代码如下:publicclassFlowDemo{publicstaticvoidmain(String[]args){classDataFilterextendsSubmissionPublisherimplementsFlow.Processor{privateFlow.Subscriptionsubscription;@OverridepublicvoidonSubscribe(Flow.Subscriptionsubscription){this.subscription=subscription;this.subscription.request(1);}@OverridepublicvoidonNext(Stringitem){this.submit("【这是一条被处理过的数据】"+item);this.subscription.request(1);}@OverridepublicvoidonError(Throwablethrowable){this.subscription.cancel();}@OverridepublicvoidonComplete(){this.close();}}SubmissionPublisherpublisher=newSubmissionPublisher<>();DataFilterdataFilter=newDataFilter();publisher.subscribe(dataFilter);Flow.Subscribersubscriber=newFlow.Subscriber(){privateFlow.Subscriptionsubscription;@OverridepublicvoidonSubscribe(Flow.Subscriptionsubscription){this.subscription=subscription;//向数据发布者请求一个数据this.subscription.request(1);}@OverridepublicvoidonNext(Stringitem){System.out.println("收到了发布者的消息:"+item);//收到后可以继续收/不收//this.subscription.cancel();try{Thread.sleep(2000);}catch(InterruptedExceptione){e.printStackTrace();}this.subscription.request(1);}@OverridepublicvoidonError(Throwablethrowable){//当异常发生时,会来到这个方法,在这个time直接取消订阅即可this.subscription.cancel();}@OverridepublicvoidonComplete(){//publisher的所有数据已经??收到,publisher已经关闭System.out.println("Datareceived");}};dataFilter.subscribe(subscriber);for(inti=0;i<500;i++){System.out.println("发送消息i-------->"+i);publisher.submit("hello:"+i);}//关闭发布者publisher.close();newScanner(System.in).next();}}为了简单起见,我这里创建了一个本地内部类DataFilter,DataFilter继承自SubmissionPublisher也实现了Flow.Processor接口。由于DataFilter继承自SubmissionPublisher,所以它也具有SubmissionPublisher的功能,完成DataFilter中消息的处理并重新发送。接下来定义publisher,让dataFilter作为它的subscriber,再定义一个新的subscriber作为dataFilter的subscriber。最终的运行效果如下:三、总结好了,这就是今天给大家介绍的Java9中的ReactiveStream。至此,我们对WebFlux的预知就差不多告一段落了。下一篇开始,WebFlux正式上线。本文转载自微信公众号“江南的一场小雨”,可通过以下二维码关注。转载本文请联系江南一点鱼公众号。