在观察上一篇ThreadPoolExecutor的submit方法时,发现它依赖于FutureTask来实现结果回调:publicFuturesubmit(Callabletask){if(task==null)throw新的NullPointerException();//##声明一个可调用任务,本质上是一个FutureTaskRunnableFutureftask=newTaskFor(task);//线程池文章分析execute(ftask);returnftask;}protectedRunnableFuturenewTaskFor(Callablecallable){returnnewFutureTask(callable);}1.FutureTask使用示例//1.声明一个可调用任务FutureTasktask=newFutureTask(()->"helloworld");ThreadthreadA=newThread(task);threadA.start();//2.阻塞方式获取任务执行结果:threadA未执行完毕,当前线程TreadB会被阻塞在这里System.out.println(task.get());在执行过程中,FutureTask实现了RunnableFuture接口,当RunnableFuture=Runnable接口+Future接口时,线程A在执行start方法时,会调用FutureTask(Runnable接口)的run()方法。run()方法会触发FutureTask内部状态的改变,并调用Callable的call()方法。这时线程B和其他线程调用FutureTask的get()方法(Future接口),这些线程会阻塞等待run()方法完成。源码层面会构建一个waiters单项链表,以LockSupport.part的形式阻塞节点上的线程call()方法执行完毕,state状态最终变为NORMAL,同时释放阻塞线程的key属性//##stateprivatevolatileintstate;privatestaticfinalintNEW=0;私有静态最终整数完成=1;私有静态最终整数正常=2;私有静态最终整数例外=3;私有静态最终整数取消=4;私有静态最终整数中断=5;私有静态最终整数中断=6;//结果返回给接口privateCallablecallable;//线程执行方法的返回结果privateObjectoutcome;//非易失性,受状态读/写保护//执行由可调用接口发送的线程privatevolatileThreadrunner;//等待节点privatevolatileWaitNodewaiters;2.run()publicvoidrun(){if(state!=NEW//runner变量赋值||!UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread()))return;尝试{Callablec=callable;/在/NEW状态下执行if(c!=null&&state==NEW){Vresult;b奥利安跑了;try{//调用Callable的call方法获取返回值result=c.call();跑=真;}catch(Throwableex){结果=null;跑=假;设置异常(前);}if(ran)//==调用方法执行成功,设置结果集(result);}}}protectedvoidset(Vv){//statechangeNEW->COMPLETINGif(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){//执行结果赋给outcomeoutcome=v;//状态变为COMPLETING->NORMAL,表示执行完成UNSAFE.putOrderedInt(this,stateOffset,NORMAL);//最终状态//==释放等待队列}}privatevoidfinishCompletion(){//assertstate>COMPLETING;for(WaitNodeq;(q=waiters)!=null;){//cas方法将waiters变量设置为nullif(UNSAFE.compareAndSwapObject(this,waitersOffset,q,null)){//##遍历队列(单程链表)WaitNode节点,释放所有等待的线程for(;;){Threadt=q.thread;if(t!=null){q.thread=null;LockSupport.unpark(t);}WaitNodenext=q.next;如果(下一个==null)中断;q.next=null;//取消链接以帮助gcq=next;}休息;}}//提供监听方法,需要自定义实现done();可调用=空;//减少占用空间}3.get()publicVget()throwsInterruptedException,ExecutionException{ints=state;//NEW和COMPLETING状态触发等待if(s<=COMPLETING)//==waitingfinishs=awaitDone(false,0L);返回报告;}privateintawaitDone(booleantimed,longnanos)throwsInterruptedException{finallongdeadline=timed?System.nanoTime()+nanos:0L;等待节点q=null;布尔排队=假;对于(;;){如果(Thread.in中断()){removeWaiter(q);抛出新的中断异常();}ints=状态;//--查看状态,如果此时已经变为NORMAL,则无需等待if(s>COMPLETING){if(q!=null)q.thread=null;返回s;}//--检查状态,如果此时是COMPLETING,则切换到其他线程执行elseif(s==COMPLETING)//还不能超时Thread.yield();//--创建一个新的等待节点elseif(q==null)q=newWaitNode();//--分配waiters变量elseif(!queued)//##insertqueued=UNSAFE.compareAndSwapObject(this,waitersOffset,q.next=waiters,q);//--有超时设置elseif(timed){nanos=deadline-System.nanoTime();//##如果超时,移除节点if(nanos<=0L){removeWaiter(q);返回状态;}//##如果没有超时,阻塞指定时间LockSupport.parkNanos(这个,纳米);}//--线程阻塞elseLockSupport.park(this);}}