作者:京东科技张天赐前言JDK8是一次大版本升级,加入了很多新特性,其中之一就是CompletableFuture。从此,真正从JDK层面支持了基于事件的异步编程范式,弥补了Future的不足。在我们日常的优化中,最常用的方法就是多线程并行执行。这将涉及CompletableFuture的使用。常见用法下面是一个常见场景的示例。如果我们有两个RPC远程调用服务,那么在进行后续的逻辑处理之前,我们需要获取这两个RPC的结果。publicstaticvoidmain(String[]args){//任务A需要2秒intresultA=compute(1);//任务B需要2秒intresultB=compute(2);//后续业务逻辑ProcessSystem.out.println(resultA+resultB);}可以估计串行执行至少需要4秒,任务B不依赖于任务A的结果。对于这种场景,我们通常会选择并行优化。演示代码如下:publicstaticvoidmain(String[]args){//只是一个简单的例子,生产代码不要这样写!//统计耗时函数time(()->{CompletableFutureresult=Stream.of(1,2)//创建一个异步任务map(x->CompletableFuture.supplyAsync(()->compute(x),executor))//aggregate.reduce(CompletableFuture.completedFuture(0),(x,y)->x.thenCombineAsync(y,Integer::sum,executor));//等待结果try{System.out.println("Result:"+result.get());}catch(ExecutionException|InterruptedExceptione){System.err.println("任务执行异常");}});}输出:[async-1]:任务执行开始:1[async-2]:任务执行开始:2[async-1]:任务执行完成:1[async-2]:任务执行完成:2结果:3耗时:2秒即可看到耗时变成了2秒。存在问题分析CompletableFuture现有的功能似乎可以满足我们的需求。但是当我们介绍一些真实的常见情况时,一些潜在的不足就暴露出来了。compute(x)如果是根据入参查询用户某类优惠券列表的任务,我们需要查询两张优惠券,并组合起来返回上游。假设上游要求我们在2秒内完成处理并返回结果,但是compute(x)的耗时从0.5秒到无穷大波动。这时候我们需要放弃compute(x)任务耗时过长的结果,只处理指定时间内完成的任务,尽可能保证服务可用。那么上述代码的耗时是由耗时最长的服务决定的,不能满足现有的需求。通常我们会使用get(longtimeout,TimeUnitunit)来指定获取结果的超时时间,我们会为compute(x)设置一个超时时间,达到超时时间后会自动抛出异常中断任务。publicstaticvoidmain(String[]args){//只是一个简单的例子,不要在生产代码中写这个!//统计耗时函数time(()->{List>result=Stream.of(1,2)//创建一个异步任务,compute(x)超时抛出异常map(x->CompletableFuture.supplyAsync(()->compute(x),executor)).toList();//等待结果intres=0;for(CompletableFuturefuture:result){try{res+=future.get(2,SECONDS);}catch(ExecutionException|InterruptedException|TimeoutExceptione){System.err.println("任务执行异常或超时");}}System.out.println("Result:"+res);});}输出:[async-2]:任务执行开始:2[async-1]:任务执行开始:1[async-1]:任务执行完成:1任务执行异常或超时结果:1耗时:2秒可见只要我们可以给compute(x)设置一个超时时间来中断任务,结合get、getNow等获取结果的方法可以很好的管理整体的耗时,那么问题就来了,如何给任务设置一个异步超时时间呢?已有实践当异步任务是RPC请求时,我们可以设置一个JSF超时时间来达到异步超时的效果。当请求是R2M请求时,我们还可以控制R2M连接的最大超时时间来达到效果。貌似我们都是靠第三方中间件的能力来管理任务超时?那么就有一个问题,中间件的超时控制能力是有限的,如果异步任务是中间件IO操作+本地计算操作呢?以JSF超时为例,反编译后的JSF代码如下://剩余等待时间longremaintime=timeout-(this.sentTime-this.genTime);if(remaintime<=0L){if(this.isDone()){//反序列化得到结果returnthis.getNow();}}elseif(this.await(remaintime,TimeUnit.MILLISECONDS)){//等待任务在时间内完成,反序列化得到结果returnthis.getNow();}this.setDoneTime();//超时抛出this时抛出异常。clientTimeoutException(false);}当任务刚好卡在超时边缘时,这个任务的耗时时间就是超时时间+获取结果的时间。但是获取结果(反序列化)是一个纯本地的计算操作,消耗的时间长短受CPU的影响很大。在某些CPU占用率高的情况下,异步任务触发失败抛出异常中断,导致我们无法准确控制超时时间。对于上游,这次所有的请求都失败了。解决方案JDK9问题非常普遍。比如在大促场景下,当服务器的CPU瞬间上升时,就会出现上述问题。那么如何解决呢?其实JDK开发的大佬们早就研究过了。在JDK9中,CompletableFuture官方提供了orTimeout和completeTimeout方法来准确实现异步超时控制。publicCompletableFutureorTimeout(longtimeout,TimeUnitunit){if(unit==null)thrownewNullPointerException();if(result==null)whenComplete(newCanceller(Delayer.delay(newTimeout(this),timeout,unit)));returnthis;}JDK9orTimeout其实现原理是通过定时任务在给定时间后抛出异常。如果任务在指定时间内完成,则取消引发异常的操作。我们按照执行顺序来看上面的代码:先执行newTimeout(this)。staticfinalclassTimeoutimplementsRunnable{finalCompletableFuture>f;超时(CompletableFuture>f){this.f=f;}publicvoidrun(){if(f!=null&&!f.??isDone())//抛出超时异常f.completeExceptionally(newTimeoutException());}}从源码可以看出Timeout是一个实现了Runnable的类,run()方法负责通过completeExceptionallyCAS为传入的异步任务分配异常。任务被标记为异常完成。那么谁来触发这个run()方法呢?我们看一下Delayer的实现。staticfinalclassDelayer{staticSc??heduledFuture>delay(Runnablecommand,longdelay,TimeUnitunit){//触发命令任务的时间returndelayer.schedule(command,delay,unit);}staticfinalclassDaemonThreadFactory实现ThreadFactory{publicThreadnewThread(Runnabler){Threadt=newThread(r);t.setDaemon(真);t.setName("CompletableFutureDelayScheduler");返回吨;}}staticfinalScheduledThreadPoolExecutor延迟器;static{(delayer=newScheduledThreadPoolExecutor(1,newDaemonThreadFactory())).setRemoveOnCancelPolicy(true);}}Delayer其实是一个单例定时调度器,Delayer.delay(newTimeout(this),timeout,unit)通过ScheduledThreadPoolExecutor()方法在指定时间后触发Timeout的运行。至此,超时抛异常的操作就实现了。但是当任务完成后,就不需要触发Timeout了。所以我们还需要实现一个取消逻辑。staticfinalclassCancelerimplementsBiConsumer