当前位置: 首页 > 后端技术 > Java

Reactor的多任务并发执行,结果按顺序返回到第一个

时间:2023-04-01 14:49:38 Java

1场景。调用多个并行服务,根据服务优先级返回第一个有效数据。具体情况:一个页面可能有多个弹窗,弹窗有优先级。每次只需要返回第一个有数据的弹窗即可。但是我也希望所有弹窗之间的数据获取是异步的。如何使用Reactor实现这个场景?2创建service2.1创建基础接口和实体类publicinterfaceTestServiceI{Monorequest();}提供一个request方法,返回一个Mono对象。@Data@ToString@AllArgsConstructor@NoArgsConstructorpublicclassTestUser{privateStringname;}2.2创建服务实现@Slf4jpublicclassTestServiceImpl1implementsTestServiceI{@OverridepublicMonorequest(){log.info("execute.test.service1");returnMono.fromSupplier(()->{try{System.out.println("service1.threadName="+Thread.currentThread().getName());Thread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}return"";}).map(name->{returnnewTestUser(name);});}}第一个服务需要500毫秒来执行。返回一个空对象;创建第二个服务需要1000毫秒来执行。返回一个空对象;代码如上,只是改变睡眠时间。继续创建第三个服务需要1000ms来执行。返回名称3。代码如上,改变休眠时间,返回name3。3主要方法publicstaticvoidmain(String[]args){longstartTime=System.currentTimeMillis();TestServiceItestServiceImpl4=newTestServiceImpl4();TestServiceItestServiceImpl5=newTestServiceImpl5();TestServiceItestServiceImpl6=newTestServiceImpl6();ListserviceIList=newArrayList<>();serviceIList.add(testServiceImpl4);serviceIList.add(testServiceImpl5);serviceIList.add(testServiceImpl6);//执行服务列表,那么可以有多少个服务Flux>monoFlux=Flux.fromIterable(serviceIList).map(service->{returnservice.request();});//flatMap(orflatMapSequential)+map实现异常继续下一次执行测试用户.claSS);如果(Objects.nonNull(testUser)&&StringUtils.isNotBlank(testUser.getName())){返回testUser;}//null是reactor中的异常数据returnnull;}).onErrorContinue((err,i)->{log.info("onErrorContinue={}",i);});});单声道mono=flux.elementAt(0,Mono.just(""));对象块=mono.block();System.out.println(block+"blockFirst执行时间ms:"+(System.currentTimeMillis()-startTime));}1。Flux.fromIterable执行服务列表,你可以随意添加或删除服务服务。2、flatMap(或flatMapSequential)+map+onErrorContinue实现异常继续下一次执行。具体参考:Reactor的onErrorContinue和onErrorResume3,Monomono=flux.elementAt(0,Mono.just(""));返回第一个正常数据。执行输出:20:54:26.512[main]DEBUGreactor.util.Loggers-UsingSlf4jloggingframework20:54:26.553[main]INFOcom.geniu.reactor.TestServiceImpl1-execute.test.service1service1.threadName=main20:54:27.237[主要]信息com.geniu.reactor.TestReactorOrderV2-onErrorContinue=TestUser(name=)20:54:27.237[主要]信息com.geniu.reactor.TestServiceImpl2-execute.test.service2service5.threadName=main20:54:28.246[主要]信息com.geniu.reactor.TestReactorOrderV2-onErrorContinue=TestUser(name=)20:54:28.246[主要]信息com.geniu.reactor.TestServiceImpl3-execute.test.service3service6.threadName=mainTestUser(name=name3)blockFirst的执行时间是ms:28951,service1和service2返回空,所以继续下一个,最后返回name3。2、观看总时长:2895ms。service1取500,service2取1000,service3取1000,发现时间基本等于service1+service2+service3。这里发生了什么?查看返回执行的线程,都是main。总结:这样就按顺序返回了第一个正常的数据。但是执行不是异步的。下一步:如何实现异步?4实现异步4.1subscribeOn实现异步修改服务实现。添加.subscribeOn(Schedulers.boundedElastic())如下:@Slf4jpublicclassTestServiceImpl1implementsTestServiceI{@OverridepublicMonorequest(){log.info("execute.test.service1");returnMono.fromSupplier(()->{try{System.out.println("service1.threadName="+Thread.currentThread().getName());Thread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}return"";})//增加subscribeOn.subscribeOn(Schedulers.boundedElastic()).map(name->{returnnewTestUser(name);});}}再次执行输出如下:21:02:04.213[main]DEBUGreactor.util.Loggers-UsingSlf4jloggingframework21:02:04.265[main]INFOcom.geniu.reactor.TestServiceImpl1-execute.test.service1service4.threadName=boundedElastic-121:02:04.300[main]INFOcom.geniu.reactor.TestServiceImpl2-execute.test.service221:02:04.302[main]INFOcom.geniu.reactor.TestServiceImpl3-execute.test。service3service2.threadName=boundedElastic-2service3.threadName=boundedElastic-321:02:04.987[boundedElastic-1]INFOcom.geniu.reactor.TestReactorOrderV2-onErrorContinue=TestUser(name=)21:02:05.307[boundedElastic-2]INFOcom.geniu.reactor.TestReactorOrderV2-onErrorContinue=TestUser(name=)TestUser(name=name6)blockFirst执行耗时ms:12421.发现实现sleep的线程不是主线程,而是boundedElastic;2、最终执行时间为1242ms,只比执行时间最长的service2和service3多了1000ms,证明是异步的。4.2CompletableFuture实现异步修改服务实现,使用CompletableFuture进行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB操作等);然后使用Mono.fromFuture返回Mono对象。@Slf4jpublicclassTestServiceImpl1implementsTestServiceI{@OverridepublicMonorequest(){log.info("execute.test.service1");CompletableFutureuCompletableFuture=CompletableFuture.supplyAsync(()->{尝试{System.out.println("service1.threadName="+Thread.currentThread().getName());Thread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}return"testname1";});returnMono.fromFuture(uCompletableFuture).map(name->{returnnewTestUser(name);});}}执行返回如下:21:09:59.465[main]DEBUGreactor.util.Loggers-UsingSlf4jloggingframework21:09:59.510[main]INFOcom.geniu.reactor.TestServiceImpl2-execute.test.service2service2.threadName=ForkJoinPool.commonPool-worker-121:09:59.526[main]INFOcom.geniu.reactor.TestServiceImpl3-execute.test.service3service3.threadName=ForkJoinPool.commonPool-worker-221:09:59.526[main]INFOcom.geniu.reactor.TestServiceImpl1-execute.test.service1service1.threadName=ForkJoinPool.commonPool-worker-321:10:00.526[ForkJoinPool.commonPool-worker-1]INFOcom.geniu.reactor.TestReactorOrder-onErrorContinue=TestUser(name=)21:10:00.538[ForkJoinPool.commonPool-worker-2]INFOcom.geniu.reactor.TestReactorOrder-onErrorContinue=TestUser(name=)TestUser(name=testname1)blockFirstExecutiontime-consumedms:12381,耗时操作使用ForkJoinPool线程池2中的线程执行,最终耗时与方法1基本相同.大家试试吧~相关链接:Reactor的onErrorContinue和onErrorResumeReactor的flatMapvsmap详解