当前位置: 首页 > 科技观察

Java响应式编程实践与原理分析

时间:2023-03-13 18:10:39 科技观察

背景在传统的命令式编程模式下,程序是按照手动编写指令的先后顺序同步执行的。也就是说,只有当前指令运行完毕,下一条指令才会开始执行。那么传统命令式编程中有哪些线程处理模型呢?第一个是同步阻塞。在这种模型中,程序只有在阻塞操作完成后才能继续执行。而且阻塞会浪费资源,比如等待网络连接(数据库请求,其他服务请求),会导致执行线程空闲。第二种是异步阻塞方式。在这种方法中,一般会通过线程池创建很多线程,然后分配空闲线程来处理请求。当每个处理线程遇到阻塞操作时,仍会中断等待操作完成,但与同步阻塞方式相比,减少了任务的响应时间。通过提高并行度,可以提高资源利用率。三是异步非阻塞,使用回调的方式来避免阻塞操作造成的资源浪费。但是回调函数会层层嵌套,造成回调噩梦(callbackhell),导致可读性很差。为了利用第三种模式,让代码更易于维护,spring社区推出了springfluxresponsivenon-blockingprogramming。它的默认实现称为projectreactor。projectreactor是JVM的完全非阻塞反应式编程基础,具有高效的需求管理(以管理“背压”的形式)。它提供了一个可组合的异步序列APIFlux(用于[0…N]元素)和Mono(用于[0|1]元素),广泛地实现了ReactiveExtensions规范。特性响应式编程特性包括以下内容。稍后我将通过示例详细向您展示。可组合性和可读性在订阅之前什么都不会发生消费者能够使用背压或其他方式向生产者发出发射率过高的信号具体丰富的数据流运算符高层次但高价值的抽象是与并发无关的实现projectreactor引入了实现Publisher的可组合反应类型并提供丰富的操作符,特别是Flux和Mono。Flux表示一个0..N项的响应序列,可以有完成信号和错误信息来结束整个过程。所以传输的数据是一个正常值,一个完成信号,一个错误信号。对应的方法有onNext()、onComplete()、onError()。Mono对象表示单个值或空(0..1)结果,可以认为是一种特殊的Flux,它最多可以发出一个公共值,还包括onComplete()和onError()。示例静态数据是通过直接调用just()方法或通过Stream或Iterable对象(例如List)创建的。同样是通过Flux静态方法,range方法生成(该方法生成一个Integer序列,第一个参数表示起始数,第二个参数表示生成数,这里生成的数据为1,2,3),empty()方法是生成一个空序列。Fluxflux1=Flux.just("one","two","three");Fluxflux2=Flux.fromStream(Stream.of("one","two","three"));Listiterable=Arrays.asList("one","two","three");Fluxflux3=Flux.fromIterable(iterable);Fluxflux4=Flux.range(1,3));//或者通过#empty()生成空数据FluxfluxEmpty=Flux.empty()Mono也有类似的创建方法,只不过just()方法只对应一个参数。justOrEmpty()方法将检查空值,并选择调用just()或empty()。//Mono也是类型MonomonoEmpty=Mono.empty();Monomono1=Mono.just("one");//justOrEmpty可以保证传入参数为空时不报错Mono>mono2=Mono.justOrEmpty(null);动态数据创建创建动态数据主要有两种方法:生成和创建。对于generate方法,Flux中有3个重载方法,无论哪个方法都会包含一个循环构造函数。在每个循环中,sink.next()方法最多被调用一次。比如flux_generate1实例对应的方法。循环生成1~10的序列,当atomicInteger大于10时调用complete()方法,发送消息通知订阅者。flux_generate2实例对应的方法将atomicInteger作为一个对象,传入方法中,最后打印到控制台。//generate生成,调用next生成数据,complete完成整个过程//AtomicInteger在一个循环中只允许调用一次next方法atomicInteger=newAtomicInteger();Fluxflux_generate1=Flux.generate(sink->{if(atomicInteger.incrementAndGet()>10){sink.complete();}sink.next(atomicInteger.get());});Fluxflux_generate2=Flux.generate(()->0,(integer,sink)->{if(++integer>10){sink.complete();}sink.next(integer);returninteger;},integer->{System.out.println("最后一个整数值为"+整数);});执行过程分析为了更好的理解flux的底层实现逻辑和编程思路,下面给大家详细演示一下flux.create方法的执行过程。特别是前面提到的在订阅之前什么都不会发生的声明的真正含义。flux.create((t)->{t.next("create");t.next("create1");}).subscribe(st->{System.out.println(st);});上面是我们要执行的代码。通过debug我们可以看到如下执行过程。Flux.create方法接受一个函数式接口Consumer作为输入参数,在我们的例子中就是这样。(t)->{t.next("创建");t.next("create1");},publicstaticFluxcreate(Consumer>emitter){returncreate(emitter,OverflowStrategy.BUFFER);}我们追踪了它所有的方式,发现它把我们的功能接口赋值给了Fluxcreate对象的一个??属性source,然后返回了。这个功能接口的逻辑没有实现。FluxCreate(Consumer>source,OverflowStrategybackpressure,FluxCreate.CreateModecreateMode){this.source=(Consumer)Objects.requireNonNull(source,"source");}this.backpressure=(OverflowStrategy)Objects.requireNonNull(backpressure,"backpressure");this.createMode=createMode;}所以什么时候执行我们的代码逻辑,再往下看。subscribe方法还接收一个功能接口。(st->{System.out.println(st);})publicfinalDisposablesubscribe(Consumerconsumer){Objects.requireNonNull(consumer,"consumer");returnthis.subscribe(consumer,(Consumer)null,(Runnable)null);}让我们看看调用订阅后会发生什么。调用订阅函数1调用订阅函数2调用订阅函数3是的,Flux.create中的执行代码是通过subscribe发起的,这里面每次next调用都会触发后续订阅者的执行,最后将结果打印出来出去。连接到目标VM,地址:'127.0.0.1:53984',传输:'socket'createcreate1DisconnectedfromthetargetVM,address:'127.0.0.1:53984',transport:'socket'Processfinishedwithexitcode0