大家好,我是北君。我相信响应式编程在各个地方都经常被提及。本文将从函数式编程到SpringWeFlux给大家做一个简单的讲解,并给出一些例子。希望大家能更好地理解响应式编程,并适时将其应用到实际项目中。1.前言要理解响应式编程,我们首先要理解函数式操作和Stream操作。让我们在下面简要回顾一下。1.1常用的函数式编程函数式接口我们先来回顾一下Java中的函数式接口。常见的如下:Consumer有输入,无输出Supplier无输入,输出Function输入T,输出RBiFunction输入T,U输出RPredicate输入,输出boolean类型上面一个例子一个简单的功能接口如下:Consumerconsumer=(i)->System.out.println("thisis"+i);consumer.accept("consumer");Suppliersupplier=()->"thisis供应商”;System.out.println(supplier.get());Functionfunction=(i)->i*i;System.out.println(function.apply(8));BiFunctionbiFunction=(i,j)->i+"*"+j+"="+i*j;System.out.println(biFunction.apply(8,8));Predicatepredicate=(i)->i.intValue()>3;System.out.println(predicate.test(5));执行结果如下:thisisconsumerthisissupplier648*8=64true1.2Stream操作对Stream进行操作,主要包括几个关键点:生成流的中间操作可以有多个。中间的操作会返回一个新的流(比如map、filter、sorted等),然后交给下一个stream方法。流的终结只有一个终结操作。最后一个操作执行完后,流已经到了终止状态,不能再操作了(比如forEach、toArray、findFirst等)。创建流的示例:String[]strArray={"ss","ss","","sdffg"};Arrays.stream(strArray).forEach(System.out::println);Arrays.asList(strArray).stream().forEach(System.out::println);Stream.of(strArray).forEach(System.out::println);Stream.iterate(1,(i)->i+1).limit(10).forEach(System.out::println);Stream.generate(()->newRandom().nextInt(10)).limit(10).forEach(System.out::println);简单流处理示例:String[]strArray1={"ss","ss","","sdffg","bca-de","fff"};Stringcollect=Stream.of(strArray1).filter(i->!i.isEmpty())//过滤空strings.sorted()//Sort.limit(1)//只取第一个元素.map(i->i.replace("-",""))//替换"-".flatMap(i->Stream.of(i.split("")))//将字符拆分为字符数组.sorted()//Sort.collect(Collectors.joining());//将字符拼接在一起System.out.println(collect);//最后输出abcde2。JavaReactiveProgramming反应式编程会使用一个发布者和一个订阅者,然后通过订阅关系传递来完成数据流转。订阅关系中可以处理一些背压问题,即调整消费者和生产者之间的供需平衡,让整个程序达到最大效率。Java9中的java.util.concurrent.Flow接口提供了与反应式流编程类似的功能。下面我们实现一个基于Java的响应式编程的例子:简单的三步:建立生产者建立消费者消费者订阅生产者Producer生产内容SubmissionPublisherpublisher=newSubmissionPublisher<>();//创建生产者Flow。Subscribersubscriber=newFlow.Subscriber(){...};//创建消费者(其实现放在下面)publisher.subscribe(subscriber);//订阅关系for(inti=0;i<10;i++){publisher.submit("测试响应式java:"+i);//生产者生产内容}消费者整个代码如下:Flow.Subscribersubscriber=newFlow.Subscriber(){Flow.Subscriptionsubscription;@OverridepublicvoidonSubscribe(Flow.Subscriptionsubscription){System.out.println("订阅先建立");this.subscription=订阅;这个订阅请求(1);}@OverridepublicvoidonNext(Objectitem){订阅。请求(10);System.out.println("接收到:"+item);}@OverridepublicvoidonError(Throwablethrowable){System.out.println("onError");}@OverridepublicvoidonComplete(){System.out.println("onComplete");}};其中onSubscribe方法表示建立订阅关系,onNext接受数据,onError请求生产者的数据,onComplete是错误或完成后的处理方法。带有中间处理器的ReactiveStream通常基于以下模型:下面我们实现一个带有中间处理功能的反应模型:下面的Processor既有发布者也有订阅者:publicclassReactiveProcessorextendsSubmissionPublisherimplementsFlow.Subscriber{@OverridepublicvoidonSubscribe(Flow.Subscriptionsubscription){System.out.println(Thread.currentThread().getName()+"反应处理器建立连接");this.subscription=订阅;这个订阅请求(1);}@OverridepublicvoidonNext(Objectitem){System.out.println(Thread.currentThread().getName()+"Reactiveprocessorreceivedata:"+item);this.submit(item.toString().toUpperCase());这个订阅请求(1);}@OverridepublicvoidonError(Throwablethrowable){System.out.println("反应处理器错误");可抛。打印堆栈跟踪();这个订阅取消();}@OverridepublicvoidonComplete(){System.out.println(Thread.currentThread().getName()+"反应处理器接收数据完成");}}如上,中间处理器订阅发布者,消费者再订阅中间处理器,也可以调整发布订阅的生产消费率。SubmissionPublisherpublisher=newSubmissionPublisher<>();//创建生产者ReactiveProcessorreactiveProcessor=newReactiveProcessor();//创建中间处理器publisher.subscribe(reactiveProcessor);//中间处理器订阅生产者Flow.Subscribersubscriber=newFlow.Subscriber(){...};//创建消费者reactiveProcessor.subscribe(subscriber);//消费者订阅中间处理器for(inti=0;i<10;i++){publisher.submit("testreactivejava:"+i);//Producer生产数据}通过上面的producer->中间处理器->consumer,可以将producer生产的数据全部转为大写,然后发送给最终的consumer。上面的Java响应式编程示例。Java会使用不同的线程来分别处理消费者和生产者的消息处理。3.ReactorReactor中两个关键的基于对象的Flux和Mono,整个Spring的响应式编程都是基于projectreactor项目。Reactor是响应式编程的依赖,主要基于JVM构建非阻塞程序。根据Reactor的介绍,这类响应式编程的三方库(Reactor)主要解决了一些JVM经典异步编程中的一些不足,同时也可以侧重于一些新的特性,如下:Composabilityandreadability(可组合性和可读性)可以使用丰富的操作运算符将数据作为流进行操作。在订阅之前,不会有背压特性(Backpressure),可以理解为消费者可以向生产者发出输出率过高的信号。从而调整生产速度。或者消费者可以选择一次拉取一捆数据进行消费。在与并发无关的高度抽象的高级函数中有这样的解释,可以形象地说明响应式编程。Reactive程序可以想象成车间的流水线,reactor不仅是流水线上的传送带,还是加工工作站。原材料从最初的生产者开始,最终成为推向消费者的产品。3.1Flux&Mono下面介绍一下Flux和Mono。在Reactor中,Flux和Mono都是Publisher,即生产者。两者之间也存在差异。Flux对象表示0到N个异步响应序列,而Mono只表示0(空)或1个结果。Reactor官网介绍的Flux如下:Mono如下所示:3.2FluxMono的创建和使用我们也可以单独引用它的依赖。使用maven依赖io.projectreactorreactor-coreio.projectreactorreactor-testtestMono创建分别创建一个空的Mono和一个包含String的Mono,由消费者打印。Mono.empty().subscribe(System.out::println);Mono.just("HelloMonoJavaNorth").subscribe(System.out::print);Flux的创建Flux的创建有以下几种方法,只是(不确定参数创建)范围(从某个整数开始,以后的整数个数)fromArray,fromIterable,fromStream,从名字就可以看出,通过数组创建Flux、迭代器和流。一些Java代码示例Flux.just(1,2,3,4,5).subscribe(System.out::print);Flux.range(1,20).subscribe(System.out::print);Flux。fromArray(newString[]{"a1","a2","a3","a4","a5","a6"}).skip(2).subscribe(System.out::print);通量。fromIterable(Arrays.asList(1,2,3,4,5,6,7)).subscribe(System.out::println);Flux.fromStream(Stream.of(Arrays.asList(1,2,3,4,5,6,7))).订阅(System.out::print);再举个generate的例子publicstaticFluxgenerate(CallablestateSupplier,BiFunction,S>generator)如上代码所示,generate需要一个Callable参数,并且是一个supplier(即没有输入值,只有一个输出),另一个参数是BiFunction(我们前面也介绍过,需要两个输入值和一个输出值)。BiFunction中的输入值之一是SynchronousSink。下面我们给出一个生成Flux的例子。Flux.generate(()->0,//提供初始状态值0(i,sink)->{sink.next("3*"+i+"="+3*i);//使用初始值产生3的乘法if(i>9)sink.complete();//设置停止条件returni+1;//返回一个新的状态值用于下一次产生,除非响应序列终止}).subscribe(System.out::println);下面我们看一个Flux嵌套处理的例子:需求:去除字符串中的空格和重复,然后对输出进行排序。Stringstr="qawsedrftgyhujikolpzasxdcvfbghnjmkloiyt";Flux.fromArray(str.split(""))//通过数组创建Flux.flatMap(i->Flux.fromArray(i.split(""))).distinct()//deduplication.sort()//sorting.subscribe(System.out::print);//flatMap类似于Stream中的flatMap,accept函数作为参数输入一个值输出一个值。这里的输出是Publisher。以上是Flux和Mono的一些简单介绍。同时,Ractor还支持将JDK中的FlowPublisher和FlowSubscriber适配为Reactor中的发布者和订阅者等。.4.WebFluxSpringBoot2之后支持的Reactive响应式编程。关于Reactive技术栈和经典Servlet技术栈的对比,这张Spring官网上的图比较清楚。Spring响应式编程主要依赖于Reactor第三方库,即上面提到的Flux和Mono库。WebFlux主要有以下几个要点:反应栈web框架,完全异步非阻塞,运行于netty、undertow、Servlet3.1+容器核心反应库Reactor,返回Flux或Mono,支持注解和函数式编程。下面的SpringWebFlux示例我们给出了几个SpringBoot响应式Web示例。你可以去https://start.spring.io/创建一个新的webflux项目。项目中主要依赖spring-boot-starter-webfluxorg.springframework.bootspring-boot-starter-webflux基于注解WebFlux:下面是最简单的基于注解的WebFlux@GetMapping("/hello/mono1")publicMonomono(){returnMono.just("HelloMono-JavaNorth");}@GetMapping("/hello/flux1")publicFluxflux(){returnFlux.just("HelloFlux","HelloJavaNorth");}基于函数式编程的WebFlux:创建RouterFunction注入Spring。@BeanpublicRouterFunctiontestRoutes1(){returnRouterFunctions.route().GET("/flux/function",newHandlerFunction(){@OverridepublicMonohandle(ServerRequestrequest){returnServerResponse.ok().bodyValue("hellowebflux,HelloJavaNorth");}}).build();}//上面的方法被函数式编程替代如下@BeanpublicRouterFunctiontestRoutes(){returnRouterFunctions.route().GET("/flux/function",request->ServerResponse.ok().bodyValue("Hellowebflux,HelloJavaNorth")).build();}Flux和Mono的响应式编程延迟示例下面我们编写一个返回Mono的响应式Web服务。@GetMapping("/hello/mono")publicMonostringMono(){Monofrom=Mono.fromSupplier(()->{try{TimeUnit.SECONDS.sleep(5);}catch(InterruptedExceptione){thrownewRuntimeException(e);}return"Hello,SpringReactivedatetime:"+LocalDateTime.now();});System.out.println("thread:"+Thread.currentThread().getName()+"==="+LocalDateTime.now()+"==========Mono函数完成==========");returnfrom;}使用postman请求如下,5秒后返回数据。后台在5秒之前处理了整个方法。后台打印日志:再看一组Flux@GetMapping(value="/hello/flux",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicFluxflux1(){FluxstringFlux=Flux.fromStream(IntStream.range(1,6).mapToObj(i->{mySleep(1);//休眠1秒return"javanorthflux"+i+"datetime:"+LocalDateTime.now();}));System.out.println("thread:"+Thread.currentThread().getName()+"==="+LocalDateTime.now()+"===========Flux函数完成=========");returnstringFlux;}这次用谷歌??浏览器请求这个服务:可以发现每秒都会产生一条消息。后台完成时间也是在一开始就完成整个方法:通过上面Flux和Mono的例子,可以很好的体验响应式编程。总结本文回顾了函数式编程、Stream操作等,然后给出了Java中Reactive编程的例子,也给出了与Reactor三方库打交道的Flux和Mono的例子。最后,SpringBootWebFlux被用来创建一个简单的响应式Web服务。我希望它能让你更好地理解响应式编程。