当前位置: 首页 > 后端技术 > Java

面试官:线程池是如何实现线程复用的?有没有了解过,告诉我在

时间:2023-04-01 17:04:36 Java

ThreadPoolExecutor中如何实现线程复用?我们知道,线程在创建时会指定一个线程任务,当线程任务执行完毕后,线程会自动销毁。但是,线程池可以重用线程。一个线程执行完线程任务后不会销毁,而是继续执行另一个线程任务。那么它是如何做到的呢?这个得从addWorker()说起addWorker()先看前半部分addWorker()privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:for(;;){intc=ctl.get();intrs=runStateOf(c);//检查边界设置if(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))returnfalse;对于(;;){intwc=workerCountOf(c);如果(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))返回false;如果(compareAndIncrementWorkerCount(c))中断重试;c=ctl.get();//重新读取ctlif(runStateOf(c)!=rs)continueretry;//否则CAS因workerCount更改而失败;retryinnerloop}}retry:可能有些同学没有用过,它只是一个标记,它的下一个标记是for循环。当for循环中调用了continue/break,后面是retry标记,表示continue/break操作将从这里开始执行,但这不是我们关注的重点。从上面的代码我们可以看出,ThreadPoolExecutor在创建线程时,将线程封装成一个工作线程worker,放入工作线程组中,然后这个worker反复从阻塞队列中取任务执行。这个addWorker在excute方法中被调用。让我们看看后半部分privatebooleanaddWorker(RunnablefirstTask,booleancore){//前半部分重试:for(;;){intc=ctl.get();intrs=runStateOf(c);//仅在必要时检查队列是否为空。如果(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))返回false;对于(;;){intwc=workerCountOf(c);//core为true,则待创建线程为核心线程,首先判断当前线程是否大于核心线程//core为false,则证明待创建线程为非核心线程,然后先判断当前线程数是否大于线程总数//如果不大于则返回false如果(compareAndIncrementWorkerCount(c))中断重试;c=ctl.get();//重新读取ctlif(runStateOf(c)!=rs)继续重试;//否则CAS因workerCount更改而失败;重试内部循环}}//后半部分booleanworkerStarted=false;布尔workerAdded=false;工人w=null;try{//创建worker对象w=newWorker(firstTask);最终线程t=w.thread;if(t!=null){//获取线程全局锁finalReentrantLockmainLock=this.mainLock;主锁.lock();尝试{intrs=runStateOf(ctl.get());//判断线程池状态if(rslargestPoolSize)largestPoolSize=s;//添加成功workerAdded=true;}}最后{mainLock.unlock();}//添加成功If(workerAdded){t.start();workerStarted=真;}}}finally{//添加失败后执行addWorkerFailedif(!workerStarted)addWorkerFailed(w);}返回workerStarted;}再看addWorkerFailed(),和上面相反,相当于回滚操作,会移除失败的工作线程privatevoidaddWorkerFailed(Workerw){//还需要全局锁finalReentrantLockmainLock=this.mainLock;主锁.lock();尝试{if(w!=null)工人。删除(w);递减工人计数();尝试终止();}最后{mainLock.unlock();}}Worker我们继续看Worker对象privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/***这个类永远不会被序列化,但是我们提供一个*serialVersionUID来抑制javac警告。*/privatestaticfinallongserialVersionUID=6138294804551838833L;/**这个工人正在运行的线程。如果工厂失败则为空。*/finalThread线程;/**要运行的初始任务。可能为空。*/可运行的firstTask;/**每线程任务计数器*/volatilelongcompletedTasks;Worker(RunnablefirstTask){setState(-1);//禁止中断直到runWorkerthis.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);}/**将主运行循环委托给外部runWorker*/publicvoidrun(){runWorker(this);}//.....//省略以下代码}Worker类实现了Runnable接口,所以Worker也是一个线程任务。在构造方法中,创建了一个线程。回过头来看,应该很清楚为什么t.start()可以用在addWorker()中了。并且在构造方法中调用线程工厂创建线程实例。我们在上一节中谈到了线程工厂。其实这不是关注的重点,重点是thisrunWorker()finalvoidrunWorker(Workerw){//获取当前线程实例Threadwt=Thread.currentThread();//直接从第一个task开始执行Runnable任务=w.firstTask;//获取后设置worker的firstTask为null,防止下一次获取w.firstTask=null;//线程启动后,通过unlock方法释放锁w.unlock();//允许中断//线程异常退出时为真booleancompletedAbruptly=true;try{//Worker执行firstTask或从workQueue中获取任务,直到任务为空while(task!=null||(task=getTask())!=null){//获取锁以防止任务执行期间中断w.锁();//判断边界值,如果线程池中断则中断线程if((runStateAtLeast(ctl.get(),STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())wt.interrupt();try{//相当于钩子方法beforeExecute(重量,任务);Throwable抛出=null;try{//执行任务task.run();}catch(RuntimeExceptionx){抛出=x;扔x;}赶上(错误x){抛出=x;扔x;}catch(Throwablex){抛出=x;抛出新错误(x);}最后{afterExecute(任务,抛出);}}最后{task=null;w.completedTasks++;w.解锁();}}completedAbruptly=false;}finally{processWorkerExit(w,completedAbruptly);首先,执行创建worker时存在的任务。调用getTask方法从阻塞队列中获取任务,然后调用task.run()执行任务,从而达到线程复用的目的。只要getTask方法不返回null,线程就不会退出。让我们看看getTask()privateRunnablegetTask(){booleantimedOut=false;//最后一次poll()是否超时?对于(;;){intc=ctl.get();intrs=runStateOf(c);//仅在必要时检查队列是否为空。if(rs>=SHUTDOWN&&(rs>=STOP||workQueue.isEmpty())){decrementWorkerCount();返回空值;}intwc=workerCountOf(c);布尔计时=allowCoreThreadTimeOut||wc>核心池大小;//如果运行线程数超过最大线程数,但缓存队列为空,则减少worker数。//如果有设置允许线程超时或者线程数超过核心线程数,并且线程在指定时间内没有轮询到任务且队列为空,则递减worker数if((wc>maximumPoolSize||(timed&&timedOut))&&(wc>1||workQueue.isEmpty())){if(compareAndDecrementWorkerCount(c))返回空;继续;}try{//如果timed为true,会调用workQueue的poll方法获取任务。//超时时间为keepAliveTime。如果超过了keepAliveTime的持续时间,//如果timed为false,当前会调用workQueue的take方法阻塞。//当有任务加入队列时,线程被唤醒,take方法返回任务并执行。可运行r=定时?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();如果(r!=null)返回r;超时=真;}catch(InterruptedException重试){timedOut=false;}}}有没有想过这里为什么要用take和poll?它们都是出列操作。这样做有什么好处?take&poll我们说take()方法会阻塞并挂起核心线程,这样它就不会占用过多的cpu资源,直到获取到Runnable再返回。如果allowCoreThreadTimeOut设置为true,那么核心线程会调用poll方法,因为poll可能返回null,所以此时核心线程如果满足超时条件就会被销毁。非核心线程会workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),如果还没有得到超时,下一次循环判断compareAndDecrementWorkerCount会返回null,Worker对象的run()方法循环体判断为null,任务结束,然后线程被系统回收。我们回头看看runWorker()是不是设计得很巧妙。结论这部分的内容不是很容易理解。想继续讨论的同学可以继续阅读它的源码。理解这部分内容就好了。其实我们从源码中就可以看出。对大量的线程状态进行检查,代码很健壮,可以借鉴