当前位置: 首页 > 科技观察

什么,你还不能用CompletableFuture?

时间:2023-03-16 11:53:05 科技观察

上一篇我们讲了Future机制。有兴趣的可以参考Future、Callable、FutureTask的关系。但是,Future机制并不是那么灵活。比如如何使用Future机制来描述两个任务的串行执行,或者两个任务是并行执行的,还是只关心先结束的任务的结果。Future机制在一定程度上无法快速满足上述要求,CompletableFuture应运而生。本视频将介绍CompletableFuture的API,并通过一些示例演示如何使用它。1、创建异步任务publicstaticCompletableFuturesupplyAsync(Suppliersupplier)publicstaticCompletableFuturesupplyAsync(Suppliersupplier,Executorexecutor);publicstaticCompletableFuturerunAsync(Runnablerunnable);publicstaticCompletableFuturerunAsync(Runnablerunnable,Executorexecutor);supplyAsync和runAsync的区别在于supplyAsync有返回值,而runAsync没有返回值。带Executor参数的构造函数,使用线程池中的线程执行异步任务(线程池可以参考讲线程池)不带Executor参数的构造函数,使用ForkJoinPool.commonPool()中的线程执行异步任务(Fork/Joinframework可以参考讲并行流parallelStream)1.1例子:使用supplyAsync创建一个返回值异步任务Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}return1;});//这个方法会一直阻塞Integerresult=completableFuture.get();System.out.println(result);}}2。异步任务回调publicCompletableFuturewhenComplete(BiConsumeraction);publicCompletableFuturewhenCompleteAsync(BiConsumeraction);publicCompletableFuturewhenCompleteAsync(BiConsumeraction,Executorexecutor);publicCompletableFutureexceptionally(函数fn);whenComplete开头的方法会在计算任务完成后回调(包括正常完成和异常),而exceptionally只会在计算任务异常时回调。如何判断哪个线程回调whenComplete比较复杂,先跳过,回调whenCompleteAsync的线程比较简单,取一个空闲线程即可,后缀Async的方法是一样的。2.1示例:计算异常时,使用whenComplete异常处理packagecom.qcy.testCompleteableFuture;importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutionException;importjava.util.function.BiConsumer;importjava.util.function.函数;importjava.util.stream.IntStream;/***@authorqcy*@create2020/09/0717:40:44*/publicclassCase2{publicstaticvoidmain(String[]args)throwsException{CompletableFuturecompletableFuture=CompletableFuture。supplyAsync(()->{try{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("ThreadexecutingsupplyAsync:"+Thread.currentThread().getName());inti=1/0;return1;});completableFuture.whenComplete(newBiConsumer(){@Overridepublicvoidaccept(Integerinteger,Throwablethrowable){System.out.println("ThreadexecutingwhenComplete:"+Thread.currentThread().getName());if(throwable==null){System.out.println("计算没有异常,result:"+整数);}}});可完成的Future.exceptionally(newFunction(){@OverridepublicIntegerapply(Throwablethrowable){//发生异常时,返回默认值System.out.println("计算异常,信息:"+throwable.getMessage());return-1;}});System.out.println(completableFuture.get());}}输出:当然CompletableFuture中的各种方法都支持链式调用和Lambda表达式,我们重写如下:publicstaticvoidmain(String[]args)throwsException{CompletableFuturecompletableFuture=CompletableFuture.supplyAsync(()->{try{Thread.sleep(2000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("ThreadexecutingsupplyAsync:"+Thread.currentThread().getName());inti=1/0;return1;}).whenComplete((integer,throwable)->{System.out.println("执行的线程whenComplete:"+Thread.currentThread().getName());if(throwable==null){System.out.println("计算无异常,结果:"+integer);}})。异常(可抛出->{//发生异常时,返回一个默认值System.out.println("计算异常,信息:"+throwable.getMessage());返回1;});System.out.println("计算结果:"+completableFuture.get());}3.任务序列化执行publicCompletableFuturethenApply(Functionfn);publicCompletableFuturethenRun(Runnableaction);publicCompletableFuturethenAccept(Consumeraction);publicCompletableFuturehandle(BiFunctionfn);publicCompletableFuturethenCompose(函数>fn);thenApply,取决于上次任务执行的结果,Function参数中,T表示上次任务返回值的类型,U代表当前任务返回值的类型,当上一个任务没有异常时,thenApply会调用thenRun,它不需要知道上一个任务的返回结果,执行完后就开始执行RunnablethenAccept上一个任务的完成,依赖于上一个任务的执行结果,因为输入parameter是Consumer,没有返回值handle类似于thenApply,但是当前面的task出现异常时,handle可以执行,但是thenApplythenCompose不会执行,会传入一个task执行的结果,返回一个新的CompleteableFuture对象。3.1示例:使用序列化任务分解两个数相乘输出包com.qcy.testCompleteableFuture;importjava.util.concurrent.CompletableFuture;/***@authorqcy*@create2020/09/0717:40:44*/publicclassCase4{publicstaticvoidmain(String[]args){CompletableFuture.supplyAsync(()->2).thenApply(num->num*3).thenAccept(System.out::print);}}显然输出为63.2示例:使用序列化任务并模拟异常packagecom.qcy.testCompleteableFuture;importjava.util.concurrent.CompletableFuture;importjava.util.function.BiFunction;/***@authorqcy*@create2020/09/0717:40:44*/publicclassCase4{publicstaticvoidmain(String[]args){CompletableFuture.supplyAsync(()->2).thenApply(num->num/0).thenApply(result->result*3).handle((integer,throwable)->{if(throwable==null){returninteger;}else{throwable.printStackTrace();return-1;}}).thenAccept(System.out::print);}}最终会输出-14。任务同时执行,都需要执行publicCompletableFuturethenCombine(CompletionStageother,Functionfn);publicCompletableFuturethenAcceptBoth(CompletionStageother,Consumeraction);publicCompletableFuturerunAfterBoth(CompletionStageother,Runnableaction);publicstaticCompletableFutureallOf(CompletableFuture...cfs);thenCombine,合并两个任务,两个任务可以同时执行,都执行成功后,执行最后的BiFunction操作,其中T代表第一个任务的执行结果类型,U表示第二个任务的执行结果类型,V表示合并后的结果类型。进行一次消费,没有返回值runAfterBoth,两个任务都执行完后,但不关心它们的返回结构,再执行一个Runnable。allOf,当所有任务都执行完,返回一个CompletableFuture4.1例子:使用thenCombine合并任务create2020/09/0717:40:44*/publicclassCase5{publicstaticvoidmain(String[]args)throwsException{CompletableFuturecf1=CompletableFuture.supplyAsync(()->{System.out.println("任务1开始");try{Thread.sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务1结束");return2;});CompletableFuturecf2=CompletableFuture。supplyAsync(()->{System.out.println("任务2开始");try{Thread.sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务2结束");return3;});CompletableFuturecompletableFuture=cf1.thenCombine(cf2,(result1,result2)->result1*result2);System.out.println("CompletableFuture:"+completableFuture.get());}}输出:你可以看到这两个任务确实是一样的当然,熟练之后可以直接使用链式操作,代码如下:40:44*/publicclassCase6{publicstaticvoidmain(String[]args)throwsException{CompletableFuturecompletableFuture=CompletableFuture.supplyAsync(()->{System.out.println("任务1开始");try{Thread.sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务1结束");return2;}).thenCombine(CompletableFuture.supplyAsync(()->{System.out.println("任务2开始");try{Thread.sleep(2000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务2结束");return3;}),(result1,result2)->result1*result2);System.out.println("计算结果:"+completableFuture.get());}}5.任务同时执行,只公开第一个完成的任务CompletableFutureapplyToEither(CompletionStageother,函数fn);publicCompletableFuture;acceptEither(CompletionStageother,Consumeraction);publicCompletableFuturerunAfterEither(CompletionStageother,Runnableaction);publicstaticCompletableFutureanyOf(CompletableFuture...cfs);applyToEither,执行最新的任务,执行结果为Function操作,其中T为第一个执行任务的结果类型,U为最后一个输出类型acceptEither,最新执行的任务,消费操作runAfterEither执行于结果,anytask执行完成后,执行Runnable操作anyOf。多个任务中,返回第一个完成的CompletableFuture5.1例子:同时执行两个任务,打印第一个完成的任务的结果packagecom.qcy.testCompleteableFuture;importjava.util.concurrent.CompletableFuture;/***@authorqcy*@create2020/09/0717:40:44*/publicclassCase7{publicstaticvoidmain(String[]args)throwsException{CompletableFuturecompletableFuture=CompletableFuture.supplyAsync(()->{System.out.println("任务1开始");try{Thread.sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务1结束");return2;}).acceptEither(CompletableFuture.supplyAsync(()->{系统.out.println("任务2开始");try{Thread.sleep(2000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务2结束");return3;}),result->System.out.println(result));//等待CompletableFuture返回,防止主线程退出completableFuture.join();}}输出:可以看到任务2结束后,任务1不会直接执行剩余代码5.2例子:同时执行多个任务,打印第一个完成的任务的结果*@create2020/09/0717:40:44*/publicclassCase8{publicstaticvoidmain(String[]args)throwsException{CompletableFuturecf1=CompletableFuture.supplyAsync(()->{System.out.println("任务1开始");try{Thread.sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务1结束");return2;});CompletableFuture<整数>cf2=CompletableFuture。supplyAsync(()->{System.out.println("任务2开始");try{Thread.sleep(2000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务2结束");return3;});CompletableFuturecf3=CompletableFuture.supplyAsync(()->{System.out.println("任务3开始");try{Thread.sleep(4000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("任务3结束");return4;});CompletableFuturefirstCf=CompletableFuture.anyOf(cf1,cf2,cf3);System.out.println(firstCf.get());}}输出: