如何用SpringWebFlux构建ReactiveRESTAPI在正式讨论之前,让我们先了解一下系统的发展,传统REST实现中遇到的问题,以及目前API的总体需求。下图简要列出了传统应用和现代应用系统的主要特征。今天的系统强调:分布式应用、云原生、高可用性和可扩展性。因此,有效利用系统现有资源至关重要。应用程序API需求的演变那么传统的RESTAPI请求处理是如何工作的呢?传统的RESTAPI模型如上图所示,传统的RESTAPI会带来以下问题:阻塞和同步→通常,请求线程会等待各种阻塞的I/O直到结束才被释放,然后response将返回到调用方广场。每个请求的线程→Web容器使用每个请求的线程模型。该模型限制了待处理的并发请求数。也就是说,容器会对请求进行排队,最终会影响API的性能。处理高并发用户的局限→正是因为web容器使用了基于请求的线程模型,所以我们无法处理那些高并发的请求。无法更好地利用系统资源→BlockedI/O会导致线程空闲,从而导致web容器无法接受更多的请求,我们将无法有效利用现有的系统资源。无背压支持→由于我们无法从客户端或服务器施加背压,因此应用程序无法在重负载下维持正常运行。也就是说,如果应用程序突然面临大量请求,服务器或客户端可能会因中断而无法访问应用程序。接下来,我们就来看看响应式API的优势,以及如何使用响应式编程来解决上述问题。异步和非阻塞→响应式编程为编写异步和非阻塞应用程序提供了灵活性。事件/消息驱动→系统可以为任何活动生成相应的事件或消息。例如,来自数据库的数据将被视为事件流。背压支持→我们可以通过施加背压“优雅地”处理从一个系统到另一个系统的压力,从而避免拒绝服务。可预测的应用程序响应时间→由于线程是异步和非阻塞的,我们可以预测负载下的应用程序响应时间。更好的利用系统资源→同样因为线程是异步非阻塞的,各种线程不会被I/O占用,可以支持更多的用户请求。基于负载的缩放方法摆脱基于请求的线程→使用ReactiveAPI,并且由于异步和非阻塞线程,我们可以摆脱基于请求的线程模型。请求产生后,模型会与服务器创建一个事件,并通过请求线程处理其他请求。那么,响应式编程的具体流程是怎样的呢?如下图所示,一旦应用程序调用操作从数据源获取数据,线程立即返回,来自数据源的数据以数据/事件流的形式出现。在这里,应用程序是订阅者,数据源是发布者。数据流完成后,将触发onComplete事件。数据流工作流程如下图所示。如果发生任何异常,发布者将触发onError事件。数据流工作流程在某些情况下,例如:从数据库中删除一个条目,发布者只会立即触发onComplete/onError事件,而不会调用onNext事件,毕竟没有数据可返回。数据流工作流程接下来,我们进一步讨论:什么是背压,以及如何对响应流施加背压。例如,我们有一个客户端应用程序正在从另一个服务请求数据。该服务可以以1000TPS(吞吐量)的速率发布事件,而客户端应用程序只能以200TPS的速率处理事件。那么在这种情况下,客户端应用程序需要通过缓冲数据来进行处理。在后续调用中,客户端应用程序可能会缓冲更多数据,最终耗尽内存。显然,这对依赖于客户端应用程序的其他程序具有级联效应。为避免这种情况,客户端应用程序可以要求服务在事件结束时进行缓冲,并以客户端应用程序的速率推送事件。这就是所谓的背压,具体过程如下图所示。背压示例下面,我们将介绍ReactiveStreams的规范(见--https://www.reactive-streams.org/),以及一个实现案例--ProjectReactor(见--https://projectreactor.io/).通常,ReactiveStreams的规范定义了以下接口类型:Publisher→Publishers是那些具有无限数量的顺序元素的提供者。它可以根据订阅者的要求发布。它的Java代码如下所示:publicinterfacePublisher{publicvoidsubscribe(Subscribers);}Subscriber→Subscribers恰好是那些拥有无限数量顺序元素的消费者。其Java代码如下:publicinterfaceSubscriber{publicvoidonSubscribe(Subscriptions);publicvoidonNext(Tt);publicvoidonError(Throwablet);publicvoidonComplete();}循环。其Java代码如下:publicinterfaceSubscription{publicvoidrequest(longn);publicvoidcancel();}处理器(Processor)→表示一个处理阶段,即订阅者和发布者之间按照约定进行处理。下面是响应流规范的类图:响应流规范实际上,响应流规范的实现方式有很多种,上面提到的ProjectReactor只是其中一种。Reactor可以完全实现非阻塞和高效的请求管理。它能够提供两个反应式和可组合的API,即:Flux[N](参见--https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)和Mono[0|1](参见——https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html)。他们广泛地实施了ReactiveExtensions。Reactor为HTTP(包括Websockets)、TCP和UDP提供了一个非阻塞的背压网络引擎。它也非常适合微服务架构。Flux→它是具有各种rx运算符的ReactiveStreams的发布者,它发出0到N个元素,然后输出成功,或完成时出现一些错误。它的流程图如下:图片来源:https://projectreactor.ioMono→也是发布者带有各种基本rx操作符的响应流程,可以发出0到1个元素,输出成功,或者with一些错误的完成结果。流程图如下:图片来源:https://projectreactor.io由于Reactor的实现往往涉及到Spring5.x,我们可以使用命令式编程结合Springservlet栈来构建RESTAPI。下图显示了Spring如何支持反应式和servlet堆栈实现。图片来源:spring.io下面是一个公开反应式RESTAPI的应用程序。在这个应用程序中,我们使用:SpringBootwithWebFluxSpringdataCassandradatabasewithresponsivesupport使用的各种依赖项。插件{id'org.springframework.boot'version'2.2.6.RELEASE'id'io.spring.dependency-management'version'1.0.9.RELEASE'id'java'}group='org.smarttechie'version='0.0.1-SNAPSHOT'sourceCompatibility='1.8'repositories{mavenCentral()}dependencies{implementation'org.springframework.boot:spring-boot-starter-data-cassandra-reactive'implementation'org.springframework.boot:spring-boot-starter-webflux'testImplementation('org.springframework.boot:spring-boot-starter-test'){excludegroup:'org.junit.vintage',module:'junit-vintage-engine'}testImplementation'io.projectreactor:reactor-test'}test{useJUnitPlatform()}在此应用程序中,我公开了以下API。您可以通过GitHub的相关链接下载源码——https://github.com/2013techsmarts/Spring-Reactive-Examples。在构建响应式API时,我们可以使用函数式编程模型来构建API,而无需使用RestController。当然,您需要具备如下的路由器和处理程序组:Router:packageorg.smarttechie.router;importorg.smarttechie.handler.ProductHandler;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.http.MediaType;importorg.springframework.web.reactive.function.server.RouterFunction;importorg.springframework.web.reactive.function.server.RouterFunctions;importorg.springframework.web.reactive.function.server.ServerResponse;importstaticorg。springframework.web.reactive.function.server.RequestPredicates.*;@ConfigurationpublicclassProductRouter{/***Therouterconfigurationfortheproducthandler.*@paramproductHandler*@return*/@BeanpublicRouterFunctionproductsRou??te(ProductHandlerproductHandler){returnRouterFunctions.route(GET("/产品").and(accept(MediaType.APPLICATION_JSON)),productHandler::getAllProducts).andRoute(POST("/product").and(accept(MediaType.APPLICATION_JSON))),productHandler::createProduct).andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON)),productHandler::deleteProduct).andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON)),productHandler::updateProduct);}}Handler:packageorg.smarttechie.handler;importorg.smarttechie.model.Product;importorg.smarttechie.service.ProductService;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.http.MediaType;importorg.springframework.stereotype.Component;importorg.springframework.web.reactive.function.server.ServerRequest;importorg.springframework.web.reactive.function.server.ServerResponse;importreactor.core.publisher.Mono;importstaticorg.springframework.web.reactive.function.BodyInserters.fromObject;@ComponentpublicclassProductHandler{@AutowiredprivateProductServiceproductService;staticMononotFound=ServerResponse.notFound().build();/***Thehandlertogetalltheavailableproducts.*@paramserverRequest*@return-alltheproductsinfo作为服务器响应的一部分*/publicMonogetAllProducts(ServerRequestserverRequest){returnServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.getAllProducts(),Product.class);}/***Thehandlertocreateaproduct*@paramserverRequest*@return-returnthecreatedproductaspartofServerResponse*/publicMonocreateProduct(ServerRequestserverRequest){MonoproductToSave=serverRequest.bodyToMono(Product.class);returnproductToSave.flatMap(product->ServerResponse.ok()).contentType(MediaType.APPLICATION_JSON).body(productService.save(product),Product.class));}/***Thehandlertodeleteaproductbasedontheproductid.*@paramserverRequest*@return-returnthedeletedproductaspartofServerResponse*/publicMonodeleteProduct(ServerRequestserverRequest){Stringid=复制代码serverRequest.pathVariable("id");MonoupdateProduct(ServerRequestserverRequest){returnproductService.update(serverRequest.bodyToMono(Product.class)).flatMap(product->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON(Objectfrom).body产品))).switchIfEmpty(notFound);}}到目前为止,我们已经了解了如何公开反应式RESTAPI对于我们使用Gatling的上述实现,对反应式API和非响应式API(使用Spring)执行了一个简单的基准测试RestController来构建非响应式API)。结果对比如下图所示。具体的Gatling负载测试脚本可以参考GitHub上的链接:https://github.com/2013techsmarts/Spring-Reactive-Examples。负载测试结果对比原标题:BuildReactiveRESTAPIsWithSpringWebFlux,作者:SivaPrasadRaoJanapati