当前位置: 首页 > 科技观察

在Java中使用CompletableFuture处理异步超时

时间:2023-03-19 22:57:59 科技观察

有一天,我在改进多线程代码时被Future.get()困住了。publicvoidserve()throwsInterruptedException,ExecutionException,TimeoutException{finalFutureresponseFuture=asyncCode();}最终响应response=responseFuture.get(1,SECONDS);send(response);}privatevoidsend(Responseresponse){//...}这是一个用Java编写的Akka应用程序,使用1000个线程的线程池(确实如此!)-所有线程都阻塞在这个get中().系统的处理速度跟不上并发请求数。重构之后,我们把这些线程都杀掉了,只留了一个,大大减少了内存占用。让我们保持简单并通过Java8示例进行演示。第一步是使用CompletableFuture代替简单的Future(参见:技巧9)。通过控制任务提交给ExecutorService的方式:只需使用CompletableFuture.supplyAsync(…,executorService)而不是executorService.submit(…)来处理基于回调的API:否则使用promises(如果您已经使用阻塞API或Future)会导致许多线程被阻塞。这就是为什么这么多异步API令人讨厌的原因。所以,让我们重写之前的代码来接收CompletableFuture:最终响应response=responseFuture.get(1,SECONDS);send(response);}显然,这并没有解决任何问题,我们仍然需要使用新的样式进行编程:publicvoidserve(){finalCompletableFutureresponseFuture=asyncCode();responseFuture.thenAccept(this::send);}这在功能上是等效的,但serve()只会运行很短的时间(不会阻塞或等待)。请记住:this::send将在完成responseFuture的同一个线程中执行。如果你不想花太多时间来重载现有的线程池或send()方法,你可以考虑通过thenAcceptAsync(this::send,sendPool)很棒,但是我们失去了两个重要的属性:异常传播和超时。异常传播很难实现,因为我们更改了API。虽然serve()存在,但异步操作可能尚未完成。如果您关心异常,请考虑返回一个responseFutureor或其他替代机制。最起码应该有异常的日志,不然异常早就被吞掉了。最终CompletableFutureresponseFuture=asyncCode();responseFuture.exceptionally(throwable->{log.error("Unrecoverableerror",throwable);returnnull;});请注意上面的代码:exceptionally()尝试从失败中恢复要恢复,返回一个可选结果。这个地方可以正常工作,但是如果你使用链式调用exceptionally()和withthenAccept(),即使失败,你仍然会调用send()方法,返回一个空参数,或者exceptionally()的任何其他返回方法值。responseFuture.exceptionally(throwable->{log.error("Unrecoverableerror",throwable);returnnull;}).thenAccept(this::send);//可能不是你想的那样丢失一秒超时的问题很巧妙。在Future完成之前,我们的原始代码最多等待(阻塞)1秒,否则将抛出TimeoutException。我们失去了这个功能,更糟糕的是,单元测试超时不是很方便,经常跳过这个环节。为了在不破坏事件驱动原则的情况下维持超时机制,我们需要创建一个额外的模块:一个在给定时间后必须失败的Future。publicstaticCompletableFuturefailAfter(Durationduration){finalCompletableFuturepromise=newCompletableFuture<>();scheduler.schedule(()->{finalTimeoutExceptionex=newTimeoutException("Timeoutafter"+duration);returnpromise.completeExceptionally(ex);},duration.toMillis(),MILLISECONDS);returnpromise;}privatestaticfinalScheduledExecutorServicescheduler=Executors.newScheduledThreadPool(1,newThreadFactoryBuilder().setDaemon(true).setNameFormat("failAfter-%d").build());这个很简单:我们创建一个承诺(一个没有后台任务或线程池的Future),并在给定的java.time.Duration之后抛出一个TimeoutException。如果你在某处调用get()获取这个Future,阻塞时间到达指定时间后会抛出TimeoutException。其实就是包装了TimeoutException的ExecutionException,不用说了。请注意,我使用了固定一个线程的线程池。这不仅仅是为了教学目的:这是一个“1个线程应该做任何人需要的事情”的场景。failAfter()本身不是很有用,但如果与我们的responseFuture一起使用,我们可以解决这个问题。最终CompletableFutureresponseFuture=asyncCode();finalCompletableFutureoneSecondTimeout=failAfter(Duration.ofSeconds(1));responseFuture.acceptEither(oneSecondTimeout,this::send).exceptionally(throwable->{log.error(“问题”,可抛出);返回空;});许多其他的事情都是在这里完成的。当后台任务收到responseFuture时,我们还创建了一个“合成的”oneSecondTimeout未来,它永远不会在成功时执行,但会导致任务在1秒后失败。现在我们把这两个结合起来叫做acceptEither,这个操作会先执行完成Future的代码块,而直接忽略responseFuture或oneSecondTimeout中较慢的那一个。如果asyncCode()代码在1秒内执行完,就会调用this::send,不会抛出oneSecondTimeout异常。但是,如果asyncCode()真的很慢,则会首先抛出oneSecondTimeout异常。由于导致任务失败的异常,将调用exceptionalerror处理程序而不是this::send方法。您可以选择执行send()或异常执行,但不能同时执行两者。例如,如果我们有两个完成正常执行的“普通”Future,响应最好的一个将调用send()方法,而后者将被丢弃。这不是最干净的解决方案。更简洁的解决方案是包装原始Future,然后保证它将在给定时间内执行。这种操作可用于com.twitter.util.Future(Scala称之为within()),但不适用于scala.concurrent.Future(大概是为了鼓励使用前者)。暂且不讨论Scala背后是如何实现的,先实现类似于CompletableFuture的操作。它以Future作为输入并返回在后台任务完成时执行的Future。但是,如果底层Future的执行时间过长,可能会抛出异常:期间);returnfuture.applyToEither(timeout,Function.identity());}这将我们引向最终的、清晰的、灵活的方法:finalCompletableFutureresponseFuture=within(asyncCode(),Duration.ofSeconds(1));responseFuture。thenAccept(this::send).exceptionally(throwable->{log.error("Unrecoverableerror",throwable);returnnull;});希望您喜欢这篇文章,因为您已经知道在Java中,实现响应式编程不再是问题。