当前位置: 首页 > 科技观察

关于并发编程,你一定要知道Future机制!

时间:2023-03-13 04:32:23 科技观察

本文转载自微信公众号“月亮与飞鱼”,作者日常加油站。转载本文请联系月版飞语公众号。前言Java5在concurrency包中引入了java.util.concurrent.Callable接口,它与Runnable接口非常相似,但它可以返回一个对象或抛出异常。Callable接口使用泛型来定义其返回类型。Executors类提供了一些有用的方法来在线程池中的Callable中执行任务。由于Callable任务是并行的,我们必须等待它返回的结果。线程属于异步计算模型,无法直接从其他线程获取函数返回值。java.util.concurrent.Future对象为我们处理了这个。线程池提交Callable任务后,返回一个Future对象,可以通过该对象获知Callable任务的状态,获取Callable返回的执行结果。Future提供了get()方法,让我们可以等待Callable结束,获取它的执行结果。Future的作用在做某项计算的时候,计算过程可能比较耗时,有时候我们会去查数据库,或者进行一些繁重的计算,比如压缩,加密等。这种情况下,如果我们一直在等待返回的方法,显然是不明智的,整个程序的运行效率会大大降低。我们可以将计算过程放到子线程中去执行,然后使用Future来控制子线程执行的计算过程,最终得到计算结果。这样可以提高整个程序的运行效率,属于异步思想。同时,在JDK1.8的文档中,对Future的描述是这样的:Future表示一个异步计算的结果。提供了检查计算是否完成、等待其完成以及检索计算结果的方法。大概意思就是Future是异步计算的接口。举个例子:比如你去吃早餐的时候点了馒头和凉菜。馒头要等3分钟,凉菜只需要1分钟。包子的时候可以同时准备凉菜,等3分钟就可以了。Future是后一种执行方式。创建Future线程池classTaskimplementsCallable{publicStringcall()throwsException{returnlongTimeCalculation();}}ExecutorServiceexecutor=Executors.newFixedThreadPool(4);//定义任务:Callabletask=newTask();//提交taskandgetFuture:Futurefuture=executor.submit(task);//从Future中获取异步执行返回的结果:Stringresult=future.get();//可能会阻塞当我们提交一个Callable任务时,我们会同时获取一个Future对象,然后我们在主线程的某个时刻调用Future对象的get()方法来获取异步执行的结果。调用get()时,如果异步任务已经完成,我们直接得到结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果未来课程和任务的结果。我们看一下FutureTask的代码实现:publicclassFutureTaskimplementsRunnableFuture{...}可以看到它实现了一个叫做RunnableFuture的接口。我们看一下RunnableFuture接口的代码实现:publicinterfaceRunnableFutureextendsRunnable,Future{voidrun();}由于RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口,所以可以使用FutureTask作为Runnable由一个线程执行,它可以作为一个Future来获取Callable的返回值。一个典型的用法是将Callable实例作为FutureTask构造函数的参数,生成一个FutureTask对象,然后把这个对象当作一个Runnable对象,放入线程池或者启动另一个线程执行,最后通过获取任务执行FutureTask的结果。下面用代码来演示一下:println("任务运行结果:"+integerFutureTask.get());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}}}classTaskimplementsCallable{@OverridepublicIntegercall()throwsException{System.out.println("子线程正在计算");intsum=0;for(inti=0;i<100;i++){sum+=i;}returnsum;}}这一段可以从代码中可以看出,首先创建了一个实现了Callable接口的Task,然后将这个Task实例传入FutureTask的构造函数中,创建了一个FutureTask实例,并将该实例作为Runnable()放入newThread中去执行,最后使用FutureTask的get得到结果打印出来。未来常用方法方法名称返回值输入参数备注摘要cancelboolean(booleanmayInterruptIfRunning)用于取消任务。如果任务取消成功,则返回true,如果任务失败,则返回false。也就是说,Future提供了三个功能:判断任务是否完成,能够中断任务,能够获取任务执行结果。isDoneboolean没有指示任务是否已完成的方法。如果任务完成,则返回true;getV没有获取执行结果的方法。该方法会阻塞,等待任务完成后返回getV(longtimeout,TimeUnitunit)用于获取执行结果。如果在指定时间内没有得到结果,则直接返回null。String[]args){ExecutorServiceservice=Executors.newFixedThreadPool(10);Futurefuture=service.submit(newCallableTask());try{System.out.println(future.get());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}service.shutdown();}staticclassCallableTaskimplementsCallable{@OverridepublicIntegercall()throwsException{Thread.sleep(3000);returnnewRandom().nextInt();}}}在此代码中,main方法创建了一个包含10个线程的新线程池,并使用submit方法将一个任务提交到这个任务中。它的作用是休眠三秒钟,然后返回一个随机数。接下来我们直接打印出future.get的结果,这个结果是正常打印出一个随机数,比如9527等。isDone()方法该方法用于判断当前任务是否已经执行。需要注意的是,如果该方法返回true,则表示执行完成;如果它返回false,则意味着它还没有完成。但是,如果这里返回true,并不代表任务执行成功。例如,任务执行到一半抛出异常。那么在这种情况下,对于这个isDone方法来说,它其实是会返回true的,因为对于它来说,虽然发生了异常,但是这个任务以后不会再执行了,它确实已经执行完了。所以当isDone方法返回true时,并不代表任务执行成功,只代表执行完毕。我们来看一个代码示例,代码如下:{for(inti=0;i<5;i++){System.out.println(i);Thread.sleep(500);}System.out.println(future.isDone());future.get();}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}}staticclassCallableTaskimplementsCallable{@OverridepublicIntegercall()throwsException{thrownewIllegalArgumentException("Callable抛出异常");}}}在这段代码中,可以看到有一个线程池,向线程池提交了一个任务,这个任务会直接抛出异常。然后我们再用一个for循环休眠,让它慢慢打印出0到4这5个数字,这样做的目的是起到一定的延时作用。执行完成后调用isDone()方法并打印结果,然后调用future.get()cancel方法。如果不想执行某个任务,可以使用cancel方法。会有以下三种第一种情况:第一种情况最简单,就是任务还没有开始执行的时候,一旦调用cancel,任务就会正常取消,以后不会再执行了,那么cancel方法返回true。第二种情况也更简单。如果任务已经完成,或者之前已经取消,那么执行cancel方法就意味着取消失败,返回false。因为任务无论是完成还是取消都不能再取消了。第三种情况是任务正在执行中。这时会根据我们传入的参数mayInterruptIfRunning进行判断,如果传入的参数为true,则执行任务的线程会收到中断信号,可能会中断正在执行的任务。有一些处理中断的逻辑,然后停止。如果传入false,表示不会中断正在运行的任务。isCancelled()方法判断是否取消。与cancel方法配合使用,比较简单。应用场景目前对于Future方法,我们经常使用的有以下几种类型:GuavaListenableFuture,通过添加监听器,计算完成后立即得到结果,不需要不断查询CompletableFutureJava8的CompletableFuture,使用thenApply,thenApplyAsync即可类似于Guava链式调用的效果。不同的是,对于Java8,如果不将thenApplyAsync传递给线程池,则会使用ForkJoinPools线程池来执行相应的方法,以免影响其他线程。NettyNetty解决问题:原生Future的isDone()方法判断一个异步操作是否完成,但定义模糊:正常终止、抛出异常、用户取消都会使isDone方法返回true。对于一个异步操作,我们有时更关注的是异步操作被触发或完成后是否可以执行一系列动作。与JDK相比,增加了完成状态的细分,增加了监听器,可以在异步线程结束后触发一系列动作。注意事项添加超时机制假设有四个任务需要执行,我们将它们放到线程池中,然后按照从1到4的顺序获取,即执行get()方法获取到的代码为如下所示:publicclassFutureDemo{publicstaticvoidmain(String[]args){//创建线程池ExecutorServiceservice=Executors.newFixedThreadPool(10);//提交任务,使用Future接收返回结果ArrayListallFutures=newArrayList<>();for(inti=0;i<4;i++){Futurefuture;if(i==0||i==1){future=service.submit(newSlowTask());}else{未来=服务。提交(newFastTask());}allFutures.add(future);}for(inti=0;i<4;i++){Futurefuture=allFutures.get(i);try{Stringresult=future.get();System.out.println(result);}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}}service.shutdown();}staticclassSlowTaskimplementsCallable{@OverridepublicStringcall()throwsException{Thread.sleep(5000);return"slowtask";}}staticclassFastTaskimplementsCallable{@OverridepublicStringcall()throwsException{return"fasttask";}}可以看到我们在代码中新建了一个线程池,用一个list保存了4个Futures。其中,前两个Future对应的任务是慢任务,即代码下面的SlowTask,后两个Future对应的任务是Task,是快任务。执行慢的任务需要5秒,而执行快的任务可以很快,几乎不需要时间。提交这4个任务后,我们使用for循环依次对它们执行get方法,获取它们的执行结果,然后打印出结果。实际上它在执行的时候会等待5秒,然后快速打印出这4行语句。那么问题来了:第三个任务比较小,可以很快返回结果,然后第四个任务也会返回结果。但是因为前两个任务很慢,所以我们在使用get方法执行的时候会卡在第一个任务上。也就是说,虽然此时第三个和第四个任务的结果已经很早就拿到了,但是我们仍然不能通过这种for循环的方式来及时获取到第三个和第四个任务的结果。任务的结果。5秒后,我们可以得到第一个任务的结果,然后我们可以得到第二个任务的结果,然后就轮到第三个和第四个任务了。假设由于网络原因,第一个任务可能最多1分钟无法返回结果,那么这个时候我们的主线程就会一直卡住,影响程序的运行效率。此时,我们可以使用Future的带超时参数的get(longtimeout,TimeUnitunit)方法来解决这个问题。这个方法的作用是如果在限定的时间内不能返回结果,会抛出TimeoutException异常,然后可以捕获或者抛出这个异常,这样就不会一直Stuck了。源码分析超时实现原理具体实现类:FutureTaskget()方法分为两步:判断当前任务的执行状态,如果不是COMPLETING,调用awaitDone()方法开始死循环,如果任务还没有执行完,它会使用nanos=deadline-System.nanoTime()检查它是否超时。如果该方法超时,它将返回。如果任务返回后状态还是<=COMPLETING,则抛出TimeoutException()。如果调用时任务没有完成,就会调用parkNanos(),调用线程会阻??塞在这里。接下来有两种情况:阻塞时间结束后,任务的执行状态还没有变为完成,进入下一个循环,直接返回。如果轮询时状态发生变化,任务完成,则死循环被打断,返回任务执行的返回值。