当前位置: 首页 > 后端技术 > Java

面试官说说线程池(下)

时间:2023-04-02 01:45:03 Java

前瞻回顾大家好,我是小张,我们在上一篇面试官:说说线程池(下),通过通用API的以线程池为切入点,分析了execute方法的源码,其中了解了DougLea老师将一个变量拆分为两个变量的操作,无锁开发(CAS),如何控制状态线程池并发等,最后通过源码阅读。每个人都带来了我认为是高效阅读源代码的方法论。如果您还没有阅读上一篇文章,请先阅读上一篇文章。本文将重点介绍以下API的源码阅读。processWorkerExitshutdownshutdownNowawaitTerminationworker线程退出我们继续上一篇worker的runWorker()的worker线程。让我们从工作线程的结尾开始。接下来我会简化代码中的其他流程,只展示核心代码。finalvoidrunWorker(Workerw){booleancompletedAbruptly=true;try{while(task!=null||(task=getTask())!=null){try{beforeExecute(wt,task);Throwable抛出=null;尝试{task.run();}catch(RuntimeExceptionx){抛出=x;扔x;}赶上(错误x){抛出=x;扔x;}catch(Throwablex){抛出=x;抛出新错误(x);}最后{afterExecute(任务,抛出);}}最后{task=null;w.completedTasks++;w.解锁();}}completedAbruptly=false;}最后{processWorkerExit(w,completedAbrup时间);}}在上一篇文章中,我们分析了只有当getTask()方法返回null时,while循环才会结束,进入processWorkerExit()方法,其中getTask()会随着线程池状态的变化(变成non-Running),或者当fetch任务超时,当前工作线程数大于核心线程数时,返回null。那么合理的processWorkerExit方法就是执行工作线程的清理工作。下面将通过源码向大家展示如何清理工作线程。privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){//如果异常结束,则工作线程数为-1if(completedAbruptly)decrementWorkerCount();//获取锁,主要用于操作worker线程集合finalReentrantLockmainLock=this.mainLock;主锁.lock();try{//汇总工作线程完成的任务数completedTaskCount+=w.completedTasks;//从worker集合中删除worker.remove(w);}最后{mainLock.unlock();}//因为线程池的状态不知道什么时候会被修改,所以需要尝试结束线程池tryTerminate();intc=ctl.get();//当处于Running或SHUTDOWN状态时if(runStateLessThan(c,STOP)){//如果不是异常中断,则计算最小存活工作线程数if(!completedAbruptly){//如果允许核心线程超时,最小值为0,否则为核心线程数intmin=allowCoreThreadTimeOut?0:核心池大小;//当最小值为0时,当任务队列中还有任务时,至少需要一个线程来处理if(min==0&&!workQueue.isEmpty())最小值=1;//判断当前工作线程数是否小于最小值,如果小于则直接返回if(workerCountOf(c)>=min)return;//不需要替换}//如果异常中断或者当前工作线程数小于最小值,需要重新添加工作线程addWorker(null,false);通过上面的分析,我们可以推断出worker线程清理过程是先清理worker集合,然后执行结束线程池的尝试,然后如果线程池状态是Running或者SHUTDOWN,就会计算最小工作线程数min,以确保始终有min个当前工作线程存活。有的朋友可能会疑惑为什么要尝试结束线程池。其实原因很简单。线程池什么时候结束对程序来说非常重要。未知,所以需要在每个线程经过的地方判断状态是否发生变化。关闭线程池在实际场景中,关闭线程池有两种方式,一种是调用shutdown,另一种是调用shutdownNow。两者的区别是前者线程池可以继续处理线程池中的任务,后者会中断所有任务,返回未执行的任务。下面我就用一个对比的方式,让大家更清楚的读懂其中的区别。通过上图中的红框,首先,两者要改变的状态是不一样的,一个是变成SHUTDOWN,一个是变成STOP状态。第二个中断空闲线程,另一个中断所有线程。如果这里有不了解线程池现状的小伙伴,请到上一篇文章阅读。那么线程池是怎么改的,工作线程是怎么中断的,看了下面的源码就会豁然开朗。advanceRunState//提前线程池状态privatevoidadvanceRunState(inttargetState){for(;;){intc=ctl.get();if(//如果当前状态大于或等于目标状态则breakrunStateAtLeast(c,targetState)||//如果当前状态小于目标状态,使用CAS尝试修改它。如果修改成功,breakctl.compareAndSet(c,ctlOf(targetState,workerCountOf(c))))break;在中等并发期间,线程池的状态可能随时发生变化。所以需要死循环,通过CAS修改线程池的状态,保证原子性。interruptIdleWorkersprivatevoidinterruptIdleWorkers(){interruptIdleWorkers(false);}privatevoidinterruptIdleWorkers(booleanonlyOne){//我们需要操作worker集合,所以锁定最后的ReentrantLockmainLock=this.mainLock;主锁.lock();try{//遍历Worker集合for(Workerw:workers){Threadt=w.thread;if(//线程未中断!t.isInterrupted()//尝试获取worker的锁,判断worker是否空闲&&w.tryLock()){try{//空闲worker,直接使用中断t.打断();}catch(SecurityExceptionignore){}finally{//释放worker的锁w.unlock();}}//如果只有一个被打断,直接breakif(onlyOne)break;}}finally{//释放操作worker集合的锁mainLock.unlock();看完了,这个方法不难读,但是在worker空闲的地方,通过tryLock方法来判断worker是否空闲。小伙伴们可能会疑惑为什么tryLock成功是空闲的工作线程呢?这里我们需要结合runWorker()方法来看上图,会发现worker线程池要么在等待①返回task任务,要么等待③执行任务。我们会发现任务返回时会进入②。这里对AQS不熟悉的朋友可以把②看做是工作线程被占用的标志。任务执行完毕后会调用④方法,标记并释放工作线程。所以我们可以知道,只要尝试占用成功的就是还没有拿到task任务的工作线程,换句话说,这些线程就是空闲线程。按照interruptWorkers的字面意思,这个方法的作用是中断所有的worker线程池,不管任务是否正在执行。privatevoidinterruptWorkers(){//获取占用worker集合的锁finalReentrantLockmainLock=this.mainLock;主锁.lock();try{//遍历worker集合for(Workerw:workers)//直接中断w.interruptIfStarted();}finally{//释放锁mainLock.unlock();}}voidinterruptIfStarted(){线程t;if(//0表示未被占用,1表示被占用getState()>=0&&//线程不会为空,不会被标记为中断,则调用线程的中断(t=thread)!=null&&!t.isInterrupted()){try{//线程中断t.interrupt();}catch(SecurityExceptionignore){}}}我们会发现,在中断所有线程和中断空闲线程的方法中,唯一的区别是该方法使用了大于等于0的getState()来判断线程状态。那么为什么大于等于0呢?让我们回到worker中的lock和unlock方法。publicvoidlock(){获取(1);}publicbooleantryLock(){返回tryAcquire(1);}publicvoidunlock(){释放(1);}Worker(RunnablefirstTask){setState(-1);.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);}熟悉AQS的朋友肯定会明白,调用lock方法会把状态变成1,unlock方法会把状态变成0。再回到worker类的构造函数,我们会发现,第一个是首先将状态设置为-1。这里是不是突然就清楚了?这就解释了为什么使用大于等于0来判断线程状态,是为了刚好创建Finishedworker线程不需要中断。tryTerminate我们会发现在shutdown和shutdownNow方法的最后调用了tryTerminate方法,那么这里面做了什么呢?finalvoidtryTerminate(){for(;;){//获取线程池状态intc=ctl.get();//检查是否可以终止if(//StatusisRunningisRunning(c)||//大于或等于TIDYING=TIDYING,TERMINATEDrunStateAtLeast(c,TIDYING)||//处于SHUTDOWN状态和任务队列不为空(runStateOf(c)==SHUTDOWN&&!workQueue.isEmpty()))返回;//①执行到这个描述状态处于SHUTDOWN,任务队列为空,说明线程池可以终止if(workerCountOf(c)!=0){//如果当前工作线程不为0,中断1workerthread,具体为什么是一个,下面会讲interruptIdleWorkers(ONLY_ONE);有解释返回;}//至此,工作线程数为0,状态为SHUTDOWN,任务队列为空finalReentrantLockmainLock=this.mainLock;主锁.lock();try{//尝试CAS将状态更改为TIDYINGif(ctl.compareAndSet(c,ctlOf(TIDYING,0))){try{//hook方法,子类实现terminated();}finally{//hook方法执行成功,状态设置为TERMINATEDctl.set(ctlOf(TERMINATED,0));//唤醒所有等待TERMINATED的线程,下面会有解释termination.signalAll();}返回;}}最后{mainLock.unlock();}//elseretryonfailedCAS}}小伙伴们看完这个方法一定不明白为什么①处的代码判断工作线程不等于0,只打断一个工作线程,而不是打断所有工作线程。我们先总结一下。之所以不打断所有线程是为了有时候至少需要一个线程来执行后面①的代码,也就是要求线程修改线程状态,否则没有线程也会修改状态。听到这个结论,朋友一定是一头雾水。看到这段源码的时候,我也是一头雾水。不知道老哥为什么要这样设计。直到我在IDE中点开方法查看其引用,一切才明了。如上图所示,我们会??发现processWorkerExit()方法中引用了tryTerminate()方法。在前面对runWorker方法的分析中,我们会发现这个方法是在worker线程run的finally块中,也就是所有的worker线程执行完成的时候都会调用这个方法。至此我们知道tryTerminate方法会在w??orker线程结束时执行,但是还是不能解释为什么worker线程数是0而不是1,是不是保证至少有一个worker线程存活并执行?我们的条件是worker线程数等于0,所以一定有地方可以减少,这个方法必须在runWorker方法中。顺着这个思路,我们找到了getTask()方法,我们会发现在①点,当工作队列为空或者状态为STOP、TIDYING、TERMINATED的时候,在SHUTDOWN时,工作线程的数量会是减一。其次,我们会发现在②处工作线程等待获取任务时,会在③处捕获中断异常,然后返回①减少工作线程数,也就是说肯定会有最后一个线程结束。工作线程数减少为0,然后修改线程池的状态为最终状态。一图胜千言,下面我通过流程图向大家展示一下它的内部工作原理。我们把所有的进程串联起来,左右两边是同时运行的,所以这也解释了为什么每次中断一个,工作线程等于0的时候,它还会继续执行。等待线程池结束经过上面的长篇大论,我们终于来到了最后一个方法,也是一个很简单的方法。相信小伙伴们有了上面的基础,再来看这段代码。大女巫。publicbooleanawaitTermination(longtimeout,TimeUnitunit)throwsInterruptedException{longnanos=unit.toNanos(timeout);finalReentrantLockmainLock=this.mainLock;主锁.lock();try{for(;;){//如果当前状态为TERMINATED,则返回trueif(runStateAtLeast(ctl.get(),TERMINATED))returntrue;//超时,返回falseif(nanos<=0)returnfalse;//没有超时,继续等待nanos=终止。等待纳米(纳米);}}最后{mainLock.unlock();}}很容易理解,状态为TERMINATED直接返回,超时直接返回。如果我在等待,线程状态将变为TERMINATED。需要有人把我从等待中叫醒吗?我们发现在tryTerminate方法成功设置TERMINATED后,所有等待结束的线程都会立即得到通知,所以这是一个字符串。对于看到最后的各位,小张非常感谢,感谢你们坚持看完。其次,你也要感谢你自己,感谢你能坚持看到这里,看完一个线程池核心源码。相信在这个过程中,你也学到了很多知识。在实际业务中,你可能不会写这么复杂的代码,代码中充斥着各种状态,但是老哥的很多思路是值得我们借鉴的。比如我们第一篇写了抽象模板方法,第二篇写了CAS无锁编程,第三篇写了如何中断正在执行的线程等等。