当前位置: 首页 > 后端技术 > Java

【JAVA并发编程】Future接口和FutureTask实现解析

时间:2023-04-01 21:45:44 Java

1.Future接口Future接口代表一个异步任务的结果,提供cancel取消,isDone是否完成,get无限等待或者等待一个等操作超时获取结果,源码如下:publicinterfaceFuture{//尝试取消任务的执行//如果mayInterruptIfRunning为真,尝试中断线程booleancancel(booleanmayInterruptIfRunning);//完成前判断是否取消booleanisCancelled();//判断是否完成booleanisDone();//无限等待结果,支持中断Vget()throwsInterruptedException,ExecutionException;//超时等待结果,支持中断Vget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}其次,FutureTaskFutureTask实现了Runnable和Future接口,所以它既是任务又是异步任务的结果.在下面的类图中,ThreadPoolExecutor线程池执行器提交任务时,会返回一个Future(如果调用了execute,则不会返回Future),那么这个Future其实就是一个FutureTask对象,会封装传入的RunnabletaskFutureTask,然后将封装好的FutureTask传入execute()方法,直接返回FutureTask,源码如下:publicFuturesubmit(Runnabletask){if(task==null)thrownewNullPointerException();RunnableFutureftask=newTaskFor(task,null);//调用具体子类的execute方法execute(ftask);返回任务;}protectedRunnableFuturenewTaskFor(Runnablerunnable,Tvalue){returnnewFutureTask(runnable,value);}我们提交任务,拿到FutureTask之后,就可以调用Future接口提供的各种操作(比如cancel取消任务,isDone是否完成,无限等待或者超时等待得到结果)。那么FutureTask是如何实现这些操作的呢?2.1关键属性FutureTask中有几个关键属性,如下:Callablecallable:对应的任务Objectoutcome:任务完成或抛出异常的结果Threadrunner:运行任务的线程WaitNodewaiters:等待的线程供FutureTask完成(可以有多个,以单向链表的形式存储)2.2状态管理FutureTask通过对任务状态的管理来实现各种操作。状态如下:privatestaticfinalintNEW=0;//初始状态privatestaticfinalintCOMPLETING=1;//完成privatestaticfinalintNORMAL=2;//正常完成privatestaticfinalintEXCEPTIONAL=3;//异常完成privatestaticfinalintCANCELED=4;//取消privatestaticfinalintINTERRUPTING=5;//privatestaticfinalint中断=6;//中断可能的状态转换,有以下几种:NEW->COMPLETING->NORMALNEW->COMPLETING->EXCEPTIONALNEW->CANCELLEDNEW->INTERRUPTING->INTERRUPTED2.3任务的运行过程当我们将任务提交给ThreadPoolExecutor时,execute(FutureTask)方法在内部执行。如何切换任务状态?大体流程如下:判断任务状态是否为NEW,如果不是,说明任务已经执行过,那么这次就不执行了执行任务,否则直接返回调用callable.call()执行任务如果任务正常完成,将结果outcome设置为call()方法的返回值,修改任务状态为NORMAL,并唤醒waitingthreadwaiters如果任务执行过程中出现异常,将结果outcome设置为抛出的异常,修改任务状态为EXCEPTIONAL,从源码中唤醒waitingthreadwaiters,如下:publicvoidrun(){//当任务状态不是NEW时,直接返回if(state!=NEW||!UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread()))return;尝试{Callablec=callable;if(c!=null&&state==NEW){V结果;布尔值;尝试{结果=c.call();//如果正常完成则ran为真ran=true;}catch(Throwableex){结果=null;//如果有异常则ran为falseran=false;//设置结果为ex,修改任务状态为EXCEPTIONAL,唤醒等待线程setException(ex);}//如果正常完成,将结果设置为result,修改任务状态为NORMAL,唤醒等待线程if(ran)set(result);}}最后{runner=null;ints=状态;如果(s>=INTERRUPTING)handlePossibleCancellationInterrupt(s);}}2.4取消任务通过调用cancel(mayInterruptIfRunning)方法,尝试取消任务的执行。如果任务状态不是初始状态NEW,直接返回false,表示取消失败。否则进一步判断mayInterruptIfRunning。如果为真,则将任务状态更改为INTERRUPTING。如果为flase,则将任务状态更改为CANCELLED。判断mayInterruptIfRunning是否为真,如果是,则调用runner线程的interrupt()中断方法,并将任务状态变为INTERRUPTED,最后唤醒等待的线程waiters并返回true,表示取消成功。源码如下:publicbooleancancel(booleanmayInterruptIfRunning){//判断state是否不是NEW,直接返回false,否则尝试通过CAS修改state的值if(!(state==NEW&&UNSAFE.compareAndSwapInt(this,stateOffset,NEW,mayInterruptIfRunning?INTERRUPTING:CANCELLED)))returnfalse;try{//如果mayInterruptIfRunning为真,则调用线程的interrupt()方法if(mayInterruptIfRunning){try{Threadt=runner;如果(t!=null)t.interrupt();}finally{//finalstate//更新状态为INTERRUPTED,putOrderedInt不保证对其他线程立即可见UNSAFE.putOrderedInt(this,stateOffset,INTERRUPTED);}}}finally{//唤醒等待线程finishCompletion();}返回真;}2.5无限等待或者超时等待的任务结果可以通过get()中断,如下:publicVget()throwsInterruptedException,ExecutionException{ints=state;//如果状态<=COMPLETING,则无限期等待直到被唤醒或中断if(s<=COMPLETING)s=awaitDone(false,0L);退货报告;}publicVget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{if(unit==null)thrownewNullPointerException();ints=状态;//Ifstate<=COMPLETING,等待超时//如果等待超时后任务仍未完成,抛出TimeoutExceptionif(s<=COMPLETING&&(s=awaitDone(true,unit.toNanos(timeout)))<=完成)抛出新的TimeoutException();退货报告;}然后我们看一下awaitDone()方法的具体实现,源码如下:privateintawaitDone(booleantimed,longnanos)throwsInterruptedException{//计算deadlinedeadlinefinallongdeadline=定时?System.nanoTime()+nanos:0L;等待节点q=null;布尔排队=假;for(;;){//如果线程被中断,则将其从等待者中移除,并直接抛出InterruptedExceptionif(Thread.interrupted()){removeWaiter(q);抛出新的中断异常();}ints=状态;//判断任务状态>COMPLETING,表示任务已经完成,直接返回任务状态if(s>COMPLETING){if(q!=null)q.thread=null;返回s;}//判断任务状态==COMPLETING,表示任务还没有完成,但是即将完成//所以如果任务状态一直是COMPLETING,就一直死Loopelseif(s==COMPLETING)线程.yield();elseif(q==null)//如果来到这里,说明任务状态为NEW,则创建等待节点q=newWaitNode();elseif(!queued)//来到这里,表示等待节点还没有入队,则通过CAS将等待节点插入到链表头部queued=UNSAFE.compareAndSwapObject(this,waitersOffset,q.next=waiters,q);elseif(timed){//如果nanos超时=deadline-System.nanoTime();//如果nanos<=0,则表示超时并从waiters中移除,并返回任务状态//否则表示未超时,将通过LockSupport.parkNanos挂起限时if(nanos<=0L){removeWaiter(q);返回状态;}LockSupport.parkNanos(this,nanos);}else//挂起当前线程LockSupport.park(this);可以看到,awaitDone()内部是通过LockSupport.park()或者parkNanos()方法来挂起线程或者限时挂起的