当前位置: 首页 > 后端技术 > Java

JavaCompletableFuture异步超时实现探索

时间:2023-04-01 23:52:38 Java

作者:京东科技张天赐前言JDK8是一次大版本升级,加入了很多新特性,其中之一就是CompletableFuture。从此,真正从JDK层面支持了基于事件的异步编程范式,弥补了Future的不足。在我们日常的优化中,最常用的方法就是多线程并行执行。这将涉及CompletableFuture的使用。常见用法下面是一个常见场景的示例。如果我们有两个RPC远程调用服务,那么在进行后续的逻辑处理之前,我们需要获取这两个RPC的结果。publicstaticvoidmain(String[]args){//任务A需要2秒intresultA=compute(1);//任务B需要2秒intresultB=compute(2);//后续业务逻辑ProcessSystem.out.println(resultA+resultB);}可以估计串行执行至少需要4秒,任务B不依赖于任务A的结果。对于这种场景,我们通常会选择并行优化。演示代码如下:publicstaticvoidmain(String[]args){//只是一个简单的例子,生产代码不要这样写!//统计耗时函数time(()->{CompletableFutureresult=Stream.of(1,2)//创建一个异步任务map(x->CompletableFuture.supplyAsync(()->compute(x),executor))//aggregate.reduce(CompletableFuture.completedFuture(0),(x,y)->x.thenCombineAsync(y,Integer::sum,executor));//等待结果try{System.out.println("Result:"+result.get());}catch(ExecutionException|InterruptedExceptione){System.err.println("任务执行异常");}});}输出:[async-1]:任务执行开始:1[async-2]:任务执行开始:2[async-1]:任务执行完成:1[async-2]:任务执行完成:2结果:3耗时:2秒即可看到耗时变成了2秒。存在问题分析CompletableFuture现有的功能似乎可以满足我们的需求。但是当我们介绍一些真实的常见情况时,一些潜在的不足就暴露出来了。compute(x)如果是根据入参查询用户某类优惠券列表的任务,我们需要查询两张优惠券,并组合起来返回上游。假设上游要求我们在2秒内完成处理并返回结果,但是compute(x)的耗时从0.5秒到无穷大波动。这时候我们需要放弃compute(x)任务耗时过长的结果,只处理指定时间内完成的任务,尽可能保证服务可用。那么上述代码的耗时是由耗时最长的服务决定的,不能满足现有的需求。通常我们会使用get(longtimeout,TimeUnitunit)来指定获取结果的超时时间,我们会为compute(x)设置一个超时时间,达到超时时间后会自动抛出异常中断任务。publicstaticvoidmain(String[]args){//只是一个简单的例子,不要在生产代码中写这个!//统计耗时函数time(()->{List>result=Stream.of(1,2)//创建一个异步任务,compute(x)超时抛出异常map(x->CompletableFuture.supplyAsync(()->compute(x),executor)).toList();//等待结果intres=0;for(CompletableFuturefuture:result){try{res+=future.get(2,SECONDS);}catch(ExecutionException|InterruptedException|TimeoutExceptione){System.err.println("任务执行异常或超时");}}System.out.println("Result:"+res);});}输出:[async-2]:任务执行开始:2[async-1]:任务执行开始:1[async-1]:任务执行完成:1任务执行异常或超时结果:1耗时:2秒可见只要我们可以给compute(x)设置一个超时时间来中断任务,结合get、getNow等获取结果的方法可以很好的管理整体的耗时,那么问题就来了,如何给任务设置一个异步超时时间呢?已有实践当异步任务是RPC请求时,我们可以设置一个JSF超时时间来达到异步超时的效果。当请求是R2M请求时,我们还可以控制R2M连接的最大超时时间来达到效果。貌似我们都是靠第三方中间件的能力来管理任务超时?那么就有一个问题,中间件的超时控制能力是有限的,如果异步任务是中间件IO操作+本地计算操作呢?以JSF超时为例,反编译后的JSF代码如下://剩余等待时间longremaintime=timeout-(this.sentTime-this.genTime);if(remaintime<=0L){if(this.isDone()){//反序列化得到结果returnthis.getNow();}}elseif(this.await(remaintime,TimeUnit.MILLISECONDS)){//等待任务在时间内完成,反序列化得到结果returnthis.getNow();}this.setDoneTime();//超时抛出this时抛出异常。clientTimeoutException(false);}当任务刚好卡在超时边缘时,这个任务的耗时时间就是超时时间+获取结果的时间。但是获取结果(反序列化)是一个纯本地的计算操作,消耗的时间长短受CPU的影响很大。在某些CPU占用率高的情况下,异步任务触发失败抛出异常中断,导致我们无法准确控制超时时间。对于上游,这次所有的请求都失败了。解决方案JDK9问题非常普遍。比如在大促场景下,当服务器的CPU瞬间上升时,就会出现上述问题。那么如何解决呢?其实JDK开发的大佬们早就研究过了。在JDK9中,CompletableFuture官方提供了orTimeout和completeTimeout方法来准确实现异步超时控制。publicCompletableFutureorTimeout(longtimeout,TimeUnitunit){if(unit==null)thrownewNullPointerException();if(result==null)whenComplete(newCanceller(Delayer.delay(newTimeout(this),timeout,unit)));returnthis;}JDK9orTimeout其实现原理是通过定时任务在给定时间后抛出异常。如果任务在指定时间内完成,则取消引发异常的操作。我们按照执行顺序来看上面的代码:先执行newTimeout(this)。staticfinalclassTimeoutimplementsRunnable{finalCompletableFuturef;超时(CompletableFuturef){this.f=f;}publicvoidrun(){if(f!=null&&!f.??isDone())//抛出超时异常f.completeExceptionally(newTimeoutException());}}从源码可以看出Timeout是一个实现了Runnable的类,run()方法负责通过completeExceptionallyCAS为传入的异步任务分配异常。任务被标记为异常完成。那么谁来触发这个run()方法呢?我们看一下Delayer的实现。staticfinalclassDelayer{staticSc??heduledFuturedelay(Runnablecommand,longdelay,TimeUnitunit){//触发命令任务的时间returndelayer.schedule(command,delay,unit);}staticfinalclassDaemonThreadFactory实现ThreadFactory{publicThreadnewThread(Runnabler){Threadt=newThread(r);t.setDaemon(真);t.setName("CompletableFutureDelayScheduler");返回吨;}}staticfinalScheduledThreadPoolExecutor延迟器;static{(delayer=newScheduledThreadPoolExecutor(1,newDaemonThreadFactory())).setRemoveOnCancelPolicy(true);}}Delayer其实是一个单例定时调度器,Delayer.delay(newTimeout(this),timeout,unit)通过ScheduledThreadPoolExecutor()方法在指定时间后触发Timeout的运行。至此,超时抛异常的操作就实现了。但是当任务完成后,就不需要触发Timeout了。所以我们还需要实现一个取消逻辑。staticfinalclassCancelerimplementsBiConsumer{finalFuturef;取消器(未来<?>f){this.f=f;}publicvoidaccept(Objectignore,Throwableex){if(ex==null&&f!=null&&!f.??isDone())//3如果抛出异常的任务没有触发,取消f.cancel(错误的);当任务执行完成,或者任务执行异常,我们不需要抛出超时异常。因此,我们可以取消delayer.schedule(command,delay,unit)返回的定时超时任务,不再触发Timeout。当我们的异步任务完成,定时超时任务还没有完成的时候,就是我们取消的时候了。所以我们可以通过whenComplete(BiConsumeraction)来完成。Canceller是BiConsumer的一个实现。它持有delayer.schedule(command,delay,unit)返回的定时超时任务,accept(Objectignore,Throwableex)实现cancel(booleanmayInterruptIfRunning)在定时超时任务未完成后取消任务。JDK8如果我们使用的是JDK9或以上版本,我们可以直接使用JDK的实现来完成异步超时操作。那么JDK8呢?其实我们也可以简单的根据上面的逻辑实现一个工具类来辅助。下面是我们自己营销的工具和用法,贴出来供大家参考,也可以自己写的更优雅~调用方法:CompletableFutureExpandUtils.orTimeout(asynchronoustask,ti??meout,timeunit);工具源码:packagecom.jd.jr.market.reduction.util;importcom.jdpay.market.common.exception.UncheckedException;importjava.util.concurrent.*;importjava.util.function.BiConsumer;/***CompletableFuture扩展工具**@authorzhangtianci7*/publicclassCompletableFutureExpandUtils{/***异常完成此CompletableFuture,如果在给定超时前未完成,则抛出{@linkTimeoutException}。**@paramtimeout在完成一个TimeoutException之前等待多长时间,在{@codeunit}中*@paramunit一个{@linkTimeUnit},结合{@codetimeout}参数,表示给定粒度单元的持续时间Time*@returnCompletableFuture*/publicstaticCompletableFutureorTimeout(CompletableFuturefuture,longtimeout,TimeUnitunit){if(null==unit){thrownewUncheckedException("超时指定粒度不能空的”);}if(null==future){thrownewUncheckedException("异步任务不能为空");}if(future.isDone()){返回未来;}returnfuture.whenComplete(newCanceller(Delayer.delay(newTimeout(future),timeout,unit)));}/***超时时操作异常完成*/staticfinalclassTimeoutimplementsRunnable{finalCompletableFuturefuture;超时(CompletableFuture未来){这个。未来=未来;}publicvoidrun(){if(null!=future&&!future.isDone()){future.completeExceptionally(newTimeoutException());}}}/***取消不必要的超时操作*/staticfinalclassCancellerimplementsBiConsumer{finalFuturefuture;取消器(未来<?>未来){this.future=future;}publicvoidaccept(Objectignore,Throwableex){if(null==ex&&null!=future&&!future.isDone()){future.cancel(false);}}}/***单例延迟调度器,只用于启动和取消任务,一个线程就够了*/staticfinalclassDelayer{staticSc??heduledFuturedelay(Runnablecommand,longdelay,TimeUnitunit){returndelayer.schedule(command,delay,unit);}staticfinal类DaemonThreadFactory实现ThreadFactory{publicThreadnewThread(Runnabler){}}staticfinalScheduledThreadPoolExecutor延迟器;static{delayer=newScheduledThreadPoolExecutor(1,newDaemonThreadFactory());delayer.setRemoveOnCancelPolicy(true);