前言在并发编程中,我们最常见的需求就是开启一个线程来执行一个函数来完成我们的需求,而在这个需求中,我们往往需要函数有返回值。比如我们需要对同一个非常大的数组中的数据求和,让每个线程在某个区间内求和,最后将这些和相加,那么每个线程都需要返回对应区间的和。在Java中,为我们提供了实现这种效果的机制——FutureTask。FutureTask在自己写FutureTask之前,我们先写一个例子来回顾一下FutureTask的编程步骤:写一个类实现Callable接口。@FunctionalInterfacepublicinterfaceCallable{/***计算结果,如果无法计算则抛出异常。**@returncomputedresult*@throwsExceptionifunabletocomputaresult*/Vcall()throwsException;}要实现接口,只需实现调用即可。可以看到这个函数是有返回值的,FutureTask返回给我们的值就是这个函数的返回值。new一个FutureTask对象,以及第一步写的newclass,newFutureTask<>(可调用实现类)。最后将刚刚获取到的FutureTask对象传入Thread类,然后启动线程到newThread(futureTask).start();。然后我们可以调用FutureTask的get方法来获取返回结果futureTask.get();。假设有一个数组数据,长度为100000,现在有10个线程,第i个线程求数组中所有数据的总和[i*10000,(i+1)*10000),然后相加这十个线程的结果。导入java.lang.reflect.Array;导入java.util.Arrays;导入java.util.Random;导入java.util.concurrent.Callable;导入java.util.concurrent.ExecutionException;导入java.util.concurrent.FutureTask;公共类FutureTaskDemo{publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{int[]data=newint[100000];随机random=newRandom();对于(inti=0;i<100000;i++){data[i]=random.nextInt(10000);}@SuppressWarnings("unchecked")FutureTask[]tasks=(FutureTask[])Array.newInstance(FutureTask.class,10);//设置10个futuretask任务计算数组中数据的总和for(inti=0;i<10;i++){intidx=i;tasks[i]=newFutureTask<>(()->{intsum=0;for(intk=idx*10000;k<(idx+1)*10000;k++){sum+=data[k];}返回总和;});}//启动线程执行futureTask任务for(FutureTaskfutureTask:tasks){newThread(futureTask).start();}intthreadSum=0;对于(FutureTaskfutureTask:tasks){threadSum+=futureTask.get();}intsum=Arrays.stream(data).sum();System.out.println(sum==threadSum);//结果永远为真}}可能你对FutureTask怎么用会比较迷茫,或者不是很清楚,现在我们来深入了解一下思路。首先启动一个线程或者继承自Thread类,然后重写Thread类的run方法,或者将一个实现了Runnable的类对象传递给Thread类。当然也可以用匿名内部类来实现。既然我们的FutureTask对象可以传给Thread类,那么就意味着FutureTask必须实现Runnable接口。我们来看看FutureTask的继承体系。可以发现FutureTask确实实现了Runnable接口,也实现了Future接口。这个Future接口主要提供我们后面使用FutureTask的get等一系列功能。看到这里,你应该能大致想象到,FutureTask中的run方法会调用Callable中实现的call方法,然后保存结果,调用get方法时返回结果。准备自己实现FutureTask工具。经过上面的分析,大家可能对FutureTask的执行过程有了一个大概的了解,但是需要注意的是,如果执行FutureTask的get方法,可能会阻塞,因为Callable的call方法可能不会执行了。结束。所以在get方法中需要阻塞线程的代码,但是这些线程需要在call方法执行完后才能唤醒。本文使用锁ReentrantLock和条件变量Condition来阻塞和唤醒线程。在我们自己实现FutureTask之前,先来熟悉一下以上两个工具的用法。ReentrantLock主要有两种方法:lock锁定临界区代码块。unlock解锁临界区代码。Condition主要有3个方法:await阻塞调用这个方法的线程,等待其他线程唤醒。signal唤醒被await方法阻塞的线程。signalAll唤醒所有被await方法阻塞的线程。导入java.util.concurrent.TimeUnit;导入java.util.concurrent.locks.Condition;导入java.util.concurrent.locks.ReentrantLock;公共类LockDemo{privateReentrantLock锁;私人条件条件;LockDemo(){lock=newReentrantLock();条件=lock.newCondition();}publicvoidblocking(){lock.lock();try{System.out.println(Thread.currentThread()+"准备好被另一个线程唤醒");条件.await();}catch(InterruptedExceptione){e.printStackTrace();}最后{lock.unlock();}}publicvoidinform()throwsInterruptedException{//休眠两秒,等待其他线程先阻塞TimeUnit.SECONDS.sleep(2);锁.锁();try{System.out.println(Thread.currentThread()+"准备唤醒其他线程");条件.信号();//唤醒被await方法阻塞的线程//condition.signalAll();//唤醒所有被await方法阻塞的线程}finally{lock.unlock();}}publicstaticvoidmain(String[]args){LockDemolockDemo=newLockDemo();Threadthread=newThread(()->{lockDemo.blocking();//执行阻塞线程的代码},"Blocking-Thread");Threadthread1=newThread(()->{try{lockDemo.inform();//执行唤醒线程的代码}catch(InterruptedExceptione){e.printStackTrace();}},"Inform-Thread");thread.start();thread1.start();}}上面代码的输出:Thread[Blocking-Thread,5,main]准备好等待被其他线程唤醒Thread[Inform-Thread,5,main]准备好唤醒其他线程FutureTask设计和implementation我们在上一篇已经讲过FutureTask的实现原理主要有以下几点:构造函数需要传入一个实现了Callable接口的类对象,在FutureTask的run方法中执行,然后会获取函数的返回值,并将返回值存储为一个thread调用get方法时,如果此时Callable中的调用已经执行完毕,直接返回call函数返回的结果即可。如果call函数还没有执行完,那么需要挂起调用get方法的线程。这里我们可以使用condition.await()挂起线程。call函数执行完成后,需要唤醒被get方法挂起的线程继续执行。这里,condition.signalAll()用于唤醒所有挂起的线程。因为我们自己实现了FutureTask,所以功能不会那么齐全,只满足我们主要的需求,主要是帮助大家理解FutureTask的原理。实现代码如下(分析在注释中):.locks.ReentrantLock;//这里需要实现Runnable接口,因为需要把这个对象放到Thread类中//而Thread需要传入的对象实现Runnable接口publicclassMyFutureTaskimplementsRunnable{privatefinalCallable可调用;私有对象返回值;//这代表我们最终的返回值privatefinalReentrantLocklock;私人最终条件条件;publicMyFutureTask(Callablecallable){//存储传入的可调用对象,方便后面的run方法调用this.callable=callable;锁=新的ReentrantLock();条件=lock.newCondition();}@SuppressWarnings("unchecked")publicVget(longtimeout,TimeUnitunit){if(returnVal!=null)//if如果满足条件,则说明调用函数已经执行,返回值不空返回(V)returnVal;//直接返回结果,提高程序执行效率,不争锁资源lock.lock();try{//这里需要进行二次判断(doublecheck)//因为如果一个线程第一次判断returnVal为空//那么此时可能因为获取了锁而被挂起//并且在挂起期间调用可能已经被执行//如果此时不做判断直接执行awaitmethod//thenlater这个线程将无法被唤醒if(returnVal==null)condition.await(timeout,unit);}catch(InterruptedExceptione){e.printStackTrace();}最后{lock.unlock();}返回(V)返回值;}@SuppressWarnings("unchecked")publicVget(){if(returnVal!=null)return(V)returnVal;锁.锁();try{//还需要仔细检查if(returnVal==null)condition.await();}catch(InterruptedExceptione){e.printStackTrace();}最后{lock.unlock();}返回(V)返回值;}@Overridepublicvoidrun(){if(returnVal!=null)return;try{//在Runnable的run方法中//执行Callable方法的调用得到返回结果returnVal=callable.call();}catch(Exceptione){e.printStackTrace();}锁.锁();try{//因为已经得到结果//因为这需要唤醒所有被await方法阻塞的线程//让他们从get方法中返回condition.signalAll();}最后{lock.unlock();}}//下面是测试代码publicstaticvoidmain(String[]args){MyFutureTaskft=newMyFutureTask<>(()->{TimeUnit.SECONDS.sleep(2);return101;});线程thread=newThread(ft);线。开始();System.out.println(ft.get(100,TimeUnit.MILLISECONDS));//输出为空System.out.println(ft.get());//输出为101}}我们现在使用自己的MyFutureTask来实现上篇文章中数组求和的例子:publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{int[]data=newint[100000];随机random=newRandom();对于(inti=0;i<100000;i++){data[i]=random.nextInt(10000);}@SuppressWarnings("unchecked")MyFutureTask[]tasks=(MyFutureTask[])Array.newInstance(MyFutureTask.class,10);对于(inti=0;i<10;i++){intidx=i;任务[i]=newMyFutureTask<>(()->{intsum=0;for(intk=idx*10000;k<(idx+1)*10000;k++){sum+=data[k];}returnsum;});}for(MyFutureTaskMyFutureTask:tasks){newThread(MyFutureTask).start();}intthreadSum=0;对于(MyFutureTaskMyFutureTask:tasks){threadSum+=MyFutureTask.get();}intsum=Arrays.stream(data).sum();System.out.println(sum==threadSum);//输出结果为true}总结本文主要介绍了FutureTask的内部原理,并且我们通过ReentrantLock和Condition实现了自己的FutureTask。本文主要内容如下:FutureTask的内部原理:FutureTask会先继承Runnable接口,这样就可以直接将FutureTask的对象作为Constructor参数放入Thread类中使用FutureTask时,我们需要传入Callable实现类的一个对象,在函数调用中实现我们需要执行的函数,执行完成后保存调用函数的返回值。当一个线程调用get方法时,返回保存的返回值时。我们使用条件变量来阻塞和唤醒线程。当线程调用get方法时,如果调用已经执行,可以直接返回结果,否则需要使用条件变量挂起线程。当call函数执行完成后,需要使用条件变量唤醒所有阻塞在get方法中的线程。双重检查:在get方法中,我们先判断returnVal是否为空,如果不为空,则直接返回结果,这样就不需要去竞争锁资源,提高了程序执行的效率。但是我们还是需要在锁保护的临界区判断returnVal是否为空,因为如果一个线程第一次判断returnVal为空,那么此时可能会因为获取到锁而被挂起。在这段时间里,调用可能已经被执行了。如果此时不加判断直接执行await方法,后续的线程将无法被唤醒,因为condition.signalAll()是在call函数执行完之后调用的。如果线程在执行完await方法后,以后就没有线程来唤醒这些线程了。更多精彩内容合集可以访问项目:https://github.com/Chang-LeHu...关注公众号:一个没用的研究僧,学习更多计算机知识(Java,Python,计算机系统基础,算法和数据结构)知识。