1FutureFuture代表一个任务的生命周期,是一个可取消的异步操作。提供了相应的方法来判断任务的状态(完成或取消),获取任务的结果并取消任务等。适用于具有可取消性和执行时间长的异步任务。concurrency包中的很多异步任务类都继承自Future,最典型的就是FutureTask1.1。Future代表异步计算的结果。它提供了检查计算是否完成、等待计算完成、获取计算结果的方法。计算完成后,只能使用get方法获取结果。如有必要,可以在计算完成之前阻止此方法。取消是通过cancel方法执行的。提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就无法撤消。如果Future用于可取消性但不提供可用结果,则您可以声明Future形式类型并返回null作为基础任务的结果。也就是说,Future具有异步执行的特性,可以使用get方法获取执行结果。如果计算没有完成,get方法就会被阻塞。如果完成了,可以多次获取,立即得到结果。如果计算没有完成,可以取消计算并查询计算的执行状态2FutureTaskFutureTask提供了Future的基本实现,如获取任务执行结果(get)和取消任务(cancel)。如果任务还没有完成,在获取任务执行结果时会阻塞。一旦执行完成,任务将无法重新启动或取消(除非使用runAndReset来执行计算)。FutureTask常用来封装Callable和Runnable,也可以作为任务提交到线程池执行。这个类除了是一个独立的类之外,还提供了创建自定义任务类的功能。FutureTask的线程安全由CAS保证。FutureTask内部维护了一个被volatile修饰的int型变量——state,代表当前任务的运行状态。在这七种状态中,有四种任务终止状态:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各种状态的转换如下:数据结构和核心参数//里面持有的可调用任务,运行后为空privateCallablecallable;//get()返回的结果或抛出的异常volatile,protectedbystatereads/writes//运行可调用线程,运行期间执行CAS操作privatevolatileThreadrunner;//使用Treiber栈保存等待线程privatevolatileWaitNodewaiters;FutureTask继承了Runnale和Future,本身也是作为线程运行的,可以提交给线程池执行。维护了一个内部类WaitNode,它是使用一个简单的Treiber栈(无锁并发栈)来实现的,用于存储等待线程。FutureTask只有一个自定义同步器Sync属性,所有方法都委托给这个同步器实现。这也是在JUC中使用AQS的一般模式。源码解析FutureTask的同步器由于Future在任务完成后可以自由多次获取结果,所以用于控制同步的AQS采用共享模式。FutureTask底层任务的执行状态保存在AQS的状态中。AQS是否允许线程获取(是否阻塞)取决于任务执行是否完成,而不是具体的状态值。privatefinalclassSyncextendsAbstractQueuedSynchronizer{//定义代表任务执行状态的常量。由于是用位运算来判断,所以状态值分别是2的次方。//表示任务准备就绪,可以执行privatestaticfinalintREADY=0;//表示任务正在执行privatestaticfinalintRUNNING=1;//表示任务已经执行privatestaticfinalintRAN=2;//表示任务已经执行canceledprivatestaticfinalintCANCELLED=4;//底层表示任务的可执行对象privatefinalCallablecallable;//表示任务执行结果,用于get方法返回。privateVresult;//表示任务执行过程中出现异常,在调用get方法时抛出。私有抛出异常;/**用于执行任务的线程。在set/cancel方法后设置为空,表示可以得到结果。*必须是易变的以确保完成后的可见性(结果和异常)。*(如果runner不是volatile,result和exception都必须是volatile)*/privatevolatileThreadrunner;/***完成或取消时获取成功*/protectedinttryAcquireShared(intignore){returninnerIsDone()?1:-1;}/***让AQS在设置最终完成状态后一直通知,通过设置runner线程为空。*该方法不会更新AQS的state属性,*所以通过写volatilerunner来保证可见性。*/protectedbooleantryReleaseShared(intignore){runner=null;returntrue;}//执行任务的方法voidinnerRun(){//用于保证任务不会重复执行if(!compareAndSetState(READY,RUNNING))return;//由于Future一般是异步执行的,所以runner一般是线程池中的一个线程。runner=Thread.currentThread();//设置执行线程后再次查看,执行前查看是否被异步取消//因为之前的CAS已经设置了状态为RUNNING,if(getState()==RUNNING){//recheckaftersettingthreadVresult;//try{result=callable.call();}catch(Throwableex){//捕获任务执行过程中抛出的所有异常setException(ex);return;}set(result);}else{//释放等待threadreleaseShared(0);//取消}}//设置结果voidinnerSet(Vv){//放入循环失败后重试。for(;;){//AQS初始化时,状态值默认为0,对应READY状态。ints=getState();//完成的任务无法设置结果if(s==RAN)return;//取消的任务无法设置结果if(s==CANCELLED){//releaseShared将设置runner为empty,//这是考虑到与其他取消请求线程竞争中断runnerreleaseShared(0);return;}//第一次设置已经完成,以免设置多次if(compareAndSetState(s,RAN)){result=v;releaseShared(0);//这个方法会更新runner,保证结果的可见性done();return;}}}//获取异步计算的结果VinnerGet()throwsInterruptedException,ExecutionException{acquireSharedInterruptibly(0);//获取共享,未完成会阻塞。//检查是否取消if(getState()==CANCELLED)thrownewCancellationException();//异步计算时出现异常if(exception!=null)thrownewExecutionException(exception);returnresult;}//取消执行任务booleaninnerCancel(booleanmayInterruptIfRunning){for(;;){ints=getState();//已完成或取消的任务不能再次取消if(ranOrCancelled(s))returnfalse;//任务处于READY或RUNNINGif(compareAndSetState(s,CANCELLED))break;}//任务取消后,中断执行线程if(mayInterruptIfRunning){Threadr=runner;if(r!=null)r.interrupt();}releaseShared(0);//线程即释放等待访问结果done();returntrue;}/***检查任务是否处于完成或取消状态*/privatebooleanranOrCancelled(intstate){return(state&(RAN|CANCELLED))!=0;}//Othermethodsareommitted}从innerCancel方法来看,Cancellation只是改变任务对象的状态,可能会中断线程的执行。如果任务的逻辑代码没有响应中断,会异步执行直到完成,但是最终的执行结果不会通过get方法返回,计算资源的开销依然存在。总的来说,Future是一个线程间协调的工具。AbstractExecutorService.submit(Callabletask)FutureTask的内部实现方法非常简单。我们先分析一下线程池的提交。submit方法默认在AbstractExecutorService中实现。几个实现的源码如下:}publicFuturesubmit(Runnabletask,Tresult){if(task==null)thrownewNullPointerException();RunnableFutureftask=newTaskFor(task,result);execute(ftask);returnftask;}publicFuturesubmit(Callabletask){if(task==null)thrownewNullPointerException();RunnableFutureftask=newTaskFor(task);execute(ftask);returnftask;}protectedRunnableFuturenewTaskFor(Runnablerunnable,Tvalue){returnnewFutureTask(runnable,value);}publicFutureTask(Runnablerunnable,Vresult){this.callable=Executors.callable(runnable,result);this.state=NEW;//ensurevisibilityofcallable}首先调用newTaskFor方法构造一个FutureTask,然后调用execute将任务放入线程池,返回FutureTaskFutureTask.run()publicvoidrun(){//创建一个新任务,CAS将runner替换为当前线程if(state!=NEW||!UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread()))return;try{Callablec=callable;if(c!=null&&state==NEW){Vresult;布尔值;try{result=c.call();ran=true;}catch(Throwableex){result=null;ran=false;setException(ex);}if(ran)set(result);//设置执行结果}}finally{//runnermustbenon-nulluntilstateissettledto//preventconcurrentcallstorun()runner=null;//statemustbere-readafternullingrunnertoprevent//leakedinterruptsints=state;if(s>=INTERRUPTING)handlePossibleCancellationInterrupt(s);//处理中断逻辑}}运行任务,如果任务状态为NEW,则使用CAS修改调用set(result)方法设置当前线程执行后的执行结果。set(result)的源码如下。首先使用cas修改状态state设置返回结果,然后使用lazySet(UNSAFE.putOrderedInt)设置状态state为结果。设置完成后调用finishCompletion()唤醒等待线程privatevoidfinishCompletion(){for(WaitNodeq;(q=waiters)!=null;){if(UNSAFE.compareAndSwapObject(this,waitersOffset,q,null)){//Removewaitingthreadfor(;;){//自旋遍历等待线程Threadt=q.thread;if(t!=null){q.thread=null;LockSupport.unpark(t);//唤醒等待线程}WaitNodeext=q.next;if(next==null)break;q.next=null;//unlinktohelpgcq=next;}break;}}//任务完成后调用函数,自定义扩展done();callable=null;//toreducefootprint}返回run方法,如果在运行过??程中被打断,需要调用handlePossibleCancellationInterrupt处理中断逻辑,保证任何中断(如cancel(true))只停留在当前runorrunAndResettaskprivatevoidhandlePossibleCancellationInterrupt(ints){//中断器中断线程之前可能会有延迟,所以我们只需要放弃CPU时间片自旋等待if(s==INTERRUPTING)while(state==INTERRUPTING)Thread.yield();//waitoutpendinginterrupt}FutureTask.runAndReset()runAndReset是FutureTask执行的另一种方法,不会返回执行结果,任务执行完成后,stat的状态会重置为NEW,这样任务就可以执行多次。runAndReset的一个典型应用是在ScheduledThreadPoolExecutor中周期性的执行任务。FutureTask.get()FutureTask通过get()获取任务执行结果。如果任务处于未完成状态(state<=COMPLETING),调用awaitDone等待任务完成。任务完成后,通过report获取执行结果或者执行过程中抛出异常。awaitDone(booleantimed,longnanos)privateintawaitDone(booleantitimed,longnanos)throwsInterruptedException{finallongdeadline=timed?System.nanoTime()+nanos:0L;WaitNodeq=null;booleanqueued=false;for(;;){//自旋如果(Thread.interrupted()){//获取并清除中断状态removeWaiter(q);//去除等待WaitNodethrownewInterruptedException();}ints=state;if(s>COMPLETING){if(q!=null)q.thread=null;//清空等待节点返回的线程;}elseif(s==COMPLETING)//cannottimeoutyetThread.yield();elseif(q==null)q=newWaitNode();elseif(!queued)//CAS修改waiterqueued=UNSAFE.compareAndSwapObject(this,waitersOffset,q.next=waiters,q);elseif(timed){nanos=deadline-System.nanoTime();if(nanos<=0L){removeWaiter(q);//超时,移除等待节点returnstate;}LockSupport.parkNanos(this,nanos);//阻塞当前线程}elseLockSupport.park(this);//阻塞当前线程}}awaitDone用于等待任务完成,或者任务被超时中断或终止。返回任务的完成状态。1.如果线程被中断,首先清除中断状态,调用removeWaiter移除等待节点,然后抛出InterruptedException。removeWaiter源码如下:privatevoidremoveWaiter(WaitNodenode){if(node!=null){node.thread=null;//先清空线程retry:for(;;){//restartonremoveWaiterrace//遍历查找依次for(WaitNodepred=null,q=waiters,s;q!=null;q=s){s=q.next;if(q.thread!=null)pred=q;elseif(pred!=null){pred.next=s;if(pred.thread==null)//checkforracecontinueretry;}elseif(!UNSAFE.compareAndSwapObject(this,waitersOffset,q,s))//casreplacecontinueretry;}break;}}}2.如果当前结束状态(state>COMPLETING),根据需要清空等待该节点的线程,返回Future状态3.如果当前正在完成(COMPLETING),说明Future此时无法做出超时动作,放弃任务4的CPU执行时间片。如果状态为NEW,首先创建一个WaitNode,然后CAS修改当前的waiters5。如果等待超时,调用removeWaiter移除等待节点,返回任务状态;如果设置了超时时间但还没有超时,park会阻塞当前thread6。其他情况直接阻塞当前线程FutureTask.cancel(booleanmayInterruptIfRunning)publicbooleancancel(booleanmayInterruptIfRunning){//如果当前Future状态为NEW,则根据参数修改Future状态为INTERRUPTING或CANCELEDif(!(state==NEW&&UNSAFE.compareAndSwapInt(this,stateOffset,NEW,mayInterruptIfRunning?INTERRUPTING:CANCELLED)))returnfalse;try{//incascalltointerruptthrowsexceptionif(mayInterruptIfRunning){//可以在运行时被中断finally{finishCompletion();//移除并唤醒所有等待的线程}returntrue;}描述:尝试取消任务如果任务已经完成或者已经被取消,这个操作将会失败。如果当前Future状态为NEW,则根据参数修改Future状态为INTERRUPTING或CANCELLED。如果当前状态不是NEW,则根据参数mayInterruptIfRunning判断任务在运行过程中是否可以中断。中断操作完成后,调用finishCompletion清除并唤醒所有等待线程。实例总结本章重点:FutureTask结果返回机制,以及内部运行状态转换