当前位置: 首页 > 科技观察

想要实现异步编程,就得掌握这个工具类!

时间:2023-03-17 19:04:18 科技观察

本文转载自微信公众号“月亮与飞鱼”,作者日常加油站。转载本文请联系月版飞语公众号。前言最近看公司代码,多线程编程用的比较多,包括CompletableFuture的使用,所以想写一篇文章总结一下,在日常的Java8项目开发中,CompletableFuture是一个非常强大的并行开发工具,其语法接近Java8。语法风格,和stream一起使用也能大大增加代码的简洁性。您可以将其应用到您的工作中,提高界面性能,优化代码。基本介绍CompletableFuture是Java8中新增的一个类,用于异步编程,继承了Future和CompletionStage,一个Future,主要有独立处理请求结果的功能。CompletionStage用于实现流处理,实现异步请求各个阶段的组合或链式处理。因此,completableFuture可以实现整个异步调用接口的扁平化和流式处理,解决了原始Future在处理一系列链式异步请求时编码复杂的问题Future的局限性1.Future的结果无法进行进一步操作的情况非阻塞我们知道,在使用Future时,只能通过isDone()方法判断任务是否完成,或者线程被get()方法阻塞等待结果返回,无法进一步执行非阻塞情况下的操作。2.多个期货的结果不能合并。假设你有多个Future异步任务。您想在执行最快的任务时或执行完所有任务后执行一些其他操作。3、多个Future不能形成链式调用当异步任务之间存在依赖关系时,Future不能将一个任务的结果传递给另一个异步任务,多个Future不能形成链式工作流。4.没有异常处理现在使用CompletableFuture可以帮助我们完成上面的事情,让我们写出更加强大优雅的异步程序。创建异步任务的基本使用通常可以使用CompletableFuture的以下静态方法来创建异步任务;//无返回值,可以指定一个线程池(默认使用forkJoinPool.commonPool)publicstaticCompletableFuturesupplyAsync(Suppliersupplier);//创建一个有返回值的异步任务publicstaticCompletableFuturesupplyAsync(Suppliersupplier,Executorexecutor);//有返回值,可以指定线程池用法举例:Executorexecutor=Executors.newFixedThreadPool(10);CompletableFuturefuture=CompletableFuture.runAsync(()->{//dosomething},executor);intpoiId=111;CompletableFuturefuture=CompletableFuture.supplyAsync(()->{PoiDTOpoi=poiService.loadById(poiId);returnpoi.getName();});//阻塞并得到结果oftheFutureStringpoiName=future.get();使用回调方法通过future.get()方法获取异步任务的结果,否则会阻塞等待任务完成CompletableFuture提供了几个回调方法,不阻塞主线程,异步任务完成后自动执行回调方法中的代码publicCompletableFuturethenRun(Runnablerunnable);//无参数,无返回值publicCompletableFuturethenAccept(Consumeraction);//接受参数,无返回值publicCompletableFuturethenApply(Functionfn);//接受参数T,有返回值U用法举例:CompletableFuturefuture=CompletableFuture.supplyAsync(()->"Hello").thenRun(()->System.out.println("dootherthings.比如异步打印日志或者发送消息"));//如果你只想执行一个CompletableFuturetask,做一些后续处理,不需要返回值,那么可以使用thenRun回调方法完成//如果主线程不依赖thenRun中代码的执行来完成,有无需使用get()方法来阻塞主线程。CompletableFuturefuture=CompletableFuture.supplyAsync(()->"Hello").thenAccept((s)->System.out.println(s+"world"));//输出:Helloworld//回调方法要使用异步任务的结果不需要返回值,所以可以使用thenAccept方法CompletableFuturefuture=CompletableFuture.supplyAsync(()->{PoiDTOpoi=poiService.loadById(poiId);returnpoi.getMainCategory();}).thenApply((s)->isMainPoi(s));//booleanisMainPoi(intpoiId);future.get();//如果要进一步处理异步任务的结果,需要返回一个值,使用thenApply方法。//如果主线程要获取回调方法的返回,还是需要使用get()方法阻塞来组合两个异步任务//thenCompose方法中的异步任务依赖于调用this的异步任务methodpublicCompletableFuturethenCompose(Function>fn);//两个独立的异步任务完成时使用publicCompletableFuturethenCombine(CompletionStageother,BiFunctionfn);使用示例:CompletableFuture>poiFuture=CompletableFuture.supplyAsync(()->poiService.queryPoiIds(cityId,poiId));//第二个任务是返回CompletableFuture的异步方法CompletableFuture>getDeal(ListpoiIds){returnCompletableFuture.supplyAsync(()->poiService.queryPoiIds(poiIds));}//然后ComposeCompletableFuture>resultFuture=poiFuture.thenCompose(poiIds->getDeal(poiIds));resultFuture.get();ThenCompose和thenApply有类似的功能,它们的区别在于thenCompose接受一个返回CompletableFuture的Function,当你想直接从CompletableFuture中获取结果U时回调方法返回,使用thenCompose如果使用thenApply,返回结果如果resultFuture的类型是CompletableFuture>>,不是CompletableFuture>CompletableFuturefuture=CompletableFuture.supplyAsync(()->"Hello").thenCombine(CompletableFuture.supplyAsync(()->"world"),(s1,s2)->s1+s2);//future.get()组合了多个CompletableFutures。当需要完成多个异步任务,再进行后续处理时,可以使用allOf方法CompletableFuturepoiIDTOFuture=CompletableFuture.supplyAsync(()->poiService.loadPoi(poiId)).thenAccept(poi->{model.setModelTitle(poi.getShopName());//domorething});CompletableFutureproductFuture=CompletableFuture.supplyAsync(()->productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId)).thenAccept(list->{model.setDefaultCount(list.size());model.setMoreDesc("more");});//future3和更多的异步任务,这里就不写CompletableFuture.allOf(poiIDTOFuture,productFuture,future3,...).join()出来了;//allOf结合了所有的异步任务,使用join得到结果。这种方式非常适合C端业务,比如异步从多个服务中取出store信息,然后组装成自己需要的模型。最后,所有的店铺信息填完后,返回这里,使用join方法获取结果。和get方法一样阻塞等待任务完成多个异步任务。有任何意思一个完成时就返回结果,可以使用任何方法CompletableFuturefuture1=CompletableFuture.supplyAsync(()->{try{TimeUnit.SECONDS.sleep(2);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}return"ResultofFuture1";});CompletableFuturefuture2=CompletableFuture.supplyAsync(()->{try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedExceptione){thrownewIllegalStateException(e);}return"ResultofFuture2";});CompletableFuturefuture3=CompletableFuture.supplyAsync(()->{try{TimeUnit.SECONDS.sleep(3);}catch(InterruptedExceptione){thrownewIllegalStateException(e);返回"ResultofFuture3";});CompletableFutureanyOfFuture=CompletableFuture.anyOf(future1,future2,future3);System.out.println(anyOfFuture.get());//ResultofFuture2异常处理Integerage=-1;CompletableFuturematurityFuture=CompletableFuture.supplyAsync(()->{if(age<0){thrownewIllegalArgumentException("Agecannotbenegative");}if(age>18){return"Adult";}else{return"Child";}}).exceptionally(ex->{System.out.println("Oops!Wehaveanexception-"+ex.getMessage());return"Unknown!";}).thenAccept(s->System.out.print(s));//Unkown!exceptionally方法可以处理异步任务的异常,当异常发生时,给异步任务链一个从错误中恢复的机会,你可以在这里记录异常或者使用handler方法返回一个默认值也可以处理异常,无论是否发生异常都会被调用Integerage=-1;CompletableFuturema??turityFuture=CompletableFuture.supplyAsync(()->{if(age<0){thrownewIllegalArgumentException("Agecannotbenegative");}if(age>18){return"Adult";}else{return"Child";}}).handle((res,ex)->{if(ex!=null){System.out.println("Oops!Wehaveanexception-"+ex.getMessage());return"Unknown!";}returnres;});分片处理fragmentationandparallelprocessing:分片是借助于stream,然后通过CompletableFuture实现并行执行,最后进行数据聚合(其实也是stream的一种方法)。CompletableFuture没有提供单独的分片API,但是可以借助stream的分片聚合功能来实现。示例://当请求项过多时,分批进行异步处理List>skuBaseIdsList=ListUtils.partition(skuIdList,10);//分片//并行List>>futureList=Lists.newArrayList();for(ListskuId:skuBaseIdsList){CompletableFuture>tmpFuture=getSkuSales(skuId);futureList.add(tmpFuture);}//聚合futureList.stream().map(CompletalbleFuture::join)。collent(Collectors.toList());举个例子给大家看看CompletableFuture异步编程的优点这里我们使用CompletableFuture来实现泡水泡茶程序。首先,我们还需要完成分工计划。在下面的程序中,我们分了3个任务:任务1负责洗水壶和烧水。Task2负责洗茶壶,洗茶杯和茶task3负责泡茶其中,任务3只有在任务1和任务2完成后才能开始。下面是代码实现。您首先跳过不熟悉的方法,例如runAsync()、supplyAsync()和thenCombine()。从全局来看,你会发现:不需要手动维护线程,没有手动维护线程的繁琐工作,也不需要我们去关注分配线程给任务的工作;语义更清晰,例如f3=f1.thenCombine(f2,()->{})可以明确表示task3在task1和task2都完成后开始;代码更加简洁,专注于业务逻辑,几乎所有代码都与业务逻辑相关//任务1:洗水壶->烧水CompletableFuturef1=CompletableFuture.runAsync(()->{System.out.println("T1:洗水壶...");sleep(1,TimeUnit.SECONDS);System.out.println("T1:烧水...");sleep(15,TimeUnit.SECONDS);});//任务2:洗茶壶->洗茶杯->取茶CompletableFuturef2=CompletableFuture.supplyAsync(()->{System.out.println("T2:Washtheteapot..".");sleep(1,TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2,TimeUnit.SECONDS);System.out.println("T2:喝茶...");sleep(1,TimeUnit.SECONDS);return"Longjing";});//任务三:任务一、任务二完成后执行:泡茶CompletableFuturef3=f1.thenCombine(f2,(__,tf)->{System.out.println("T1:取茶:"+tf);System.out.println("T1:泡茶...");return"Tea:"+tf;});//等待任务3的执行结果System.out.println(f3.join());voidsleep(intt,TimeUnitu){try{u.sleep(t);}catch(InterruptedExceptione){}}注解1.CompletableFuturedefaults线程池是否适合使用上面提到的静态方法runAsync和supplyAsync创建CompletableFuture异步任务等,可以指定使用的线程池,不指定则使用CompletableFuture默认的线程池privatestaticfinalExecutorasyncPool=useCommonPool?ForkJoinPool.commonPool():newThreadPerTaskExecutor();可以看出CompletableFuture默认的线程池是调用ForkJoinPool的commonPool()方法创建的。此默认线程池中的核心线程数取决于CPU核心数。公式为Runtime.getRuntime().availableProcessors()-1,以4核为例,以双槽CPU为例。核心线程数为4*2-1=7。这样的设置满足了CPU密集型应用,但是对于IO密集型应用来说就存在风险了。当qps很高的时候,线程数可能设置的太小,会导致上线失败。所以可以根据业务情况自定义线程池。2.get设置超时时间不能序列化,否则会导致接口延迟线程数*超时时间