前言今天就来讨论一下ThreadPoolExecutor,一起来看看吧!ThreadPoolExecutor如何实现线程复用?我们知道,线程在创建时会指定一个线程任务,当线程任务执行完毕后,线程会自动销毁。但是线程池可以复用线程。一个线程执行完线程任务后并没有销毁,继续执行其他线程任务。那么它是如何做到的呢?这必须从addWorker()开始。addWorker()先看addWorker()的前半部分。privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:for(;;){intc=ctl.get();intrs=runStateOf(c);//检查边界设置if(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))returnfalse;对于(;;){intwc=workerCountOf(c);如果(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))returnfalse;如果(compareAndIncrementWorkerCount(c))中断重试;c=ctl.get();//重新读取ctlif(runStateOf(c)!=rs)continueretry;//否则CAS因workerCount更改而失败;retryinnerloop}}retry:可能有的同学没有用过,它只是一个标记,它的下一个标记是for循环,在for循环中调用continue/break,然后跟着retry标记,意思是continue/break操作就是从这个地方开始执行的,但这不是我们关注的重点。从上面的代码我们可以看出,ThreadPoolExecutor在创建线程时,会将线程封装为“工作线程worker”,放入“工作线程组”。然后worker反复从阻塞队列中取任务执行。这个addWorker在execute方法中被调用。让我们继续下半场。privatebooleanaddWorker(RunnablefirstTask,booleancore){//前半部分重试:for(;;){intc=ctl.get();intrs=runStateOf(c);//仅在必要时检查队列是否为空.if(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))returnfalse;对于(;;){intwc=workerCountOf(c);//core为true,要创建的线程是核心线程,先判断当前线程是否大于核心线程//如果core为false,证明需要创建非核心线程,那么先判断当前线程数是否大于总线程数//如果不大于则返回falseif(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))returnfalse;如果(compareAndIncrementWorkerCount(c))中断重试;c=ctl.get();//重新读取ctlif(runStateOf(c)!=rs)继续重试;//否则CAS因workerCount更改而失败;重试内部循环}}//后半部分booleanworkerStarted=false;布尔workerAdded=false;工人w=null;try{//创建worker对象w=newWorker(firstTask);最终线程t=w.thread;if(t!=null){//获取线程全局锁finalReentrantLockmainLock=this.mainLock;主锁.lock();尝试{intrs=runStateOf(ctl.get());//判断线程池状态if(rslargestPoolSize)largestPoolSize=s;//添加成功workerAdded=true;}}最后{mainLock.unlock();}//添加成功后执行线程if(workerAdded){t.start();workerStarted=真;}}}finally{//添加失败后执行addWorkerFailedif(!workerStarted)addWorkerFailed(w);}返回workerStarted;再看addWorkerFailed(),和上面相反,相当于回滚操作,会移除失败的工作线程privatevoidaddWorkerFailed(Workerw){//还需要全局锁finalReentrantLockmainLock=this.mainLock;主锁.lock();尝试{如果(w!=null)workers.remove(w);递减工人计数();尝试终止();}最后{mainLock.unlock();}}Worker让我们看看Worker对象。privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/***这个类永远不会被序列化,但是我们提供一个*serialVersionUID来抑制javac警告。*/privatestaticfinallongserialVersionUID=6138294804551838833L;/**这个工人正在运行的线程。如果工厂失败则为空。*/finalThread线程;/**要运行的初始任务。可能为空。*/可运行的firstTask;/**每线程任务计数器*/volatilelongcompletedTasks;Worker(RunnablefirstTask){setState(-1);//禁止中断直到runWorkerthis.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);}/**将主运行循环委托给外部runWorker*/publicvoidrun(){runWorker(this);}//.....//省略下边代码}Worker类实现了Runnable接口,所以WOrker也是一个线程任务。在构造方法中,创建了一个线程。回过头来应该就清楚为什么在addWorker()中可以使用t.start(),在构造方法中调用线程工厂创建线程了。例如,我们在上一节中谈到了线程工厂。其实这不是关注的重点,重点是这个runWorker()。finalvoidrunWorker(Workerw){//获取当前线程实例Threadwt=Thread.currentThread();//Runnabletask=w.firstTask直接来自第一个任务;//将worker的firstTask设置为null,防止下次获取w.firstTask=null;//线程启动后,通过unlock方法释放锁w.unlock();//允许中断//当线程异常退出时为真booleancompletedAbruptly=true;try{//Worker执行firstTask或从workQueue中获取任务,直到任务为空while(task!=null||(task=getTask())!=null){//获取锁以防止任务执行期间中断w.lock();//判断边界值如果线程池被中断,则线程会被中断if((runStateAtLeast(ctl.get(),STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())wt.interrupt();try{//相当于钩子方法beforeExecute(wt,task);Throwable抛出=null;try{//执行任务task.run();}catch(RuntimeExceptionx){抛出=x;扔x;}赶上(错误x){抛出=x;扔x;}catch(Throwablex){抛出=x;抛出新错误(x);}最后{afterExecute(任务,抛出);}}最后{task=null;w.completedTasks++;w.解锁();}}completedAbruptly=false;processWorkerExit(w,completedAbruptly);}}首先执行创建worker时存在的任务。任务执行完毕后,worker的生命周期并没有结束。在while循环中,worker会不断调用getTask方法从“阻塞队列”中获取任务,然后调用task.run()执行任务,从而达到“复用线程”的目的。只要getTask方法不返回null,线程就不会退出。让我们继续getTask()。privateRunnablegetTask(){booleantimedOut=false;//最后一次poll()是否超时?对于(;;){intc=ctl.get();intrs=runStateOf(c);//仅在必要时检查队列是否为空。if(rs>=SHUTDOWN&&(rs>=STOP||workQueue.isEmpty())){decrementWorkerCount();返回空值;}intwc=workerCountOf(c);布尔计时=allowCoreThreadTimeOut||wc>核心池大小;//如果运行线程数超过最大线程数,但缓存队列为空,则减少worker数。//如果有设置允许线程超时或者线程数超过核心线程数,并且线程在指定时间内没有轮询到任务且队列为空,则递减worker数if((wc>maximumPoolSize||(timed&&timedOut))&&(wc>1||workQueue.isEmpty())){if(compareAndDecrementWorkerCount(c))返回空;继续;}try{//如果timed为true,会调用workQueue的poll方法获取任务。//超时时间为keepAliveTime。如果超过了keepAliveTime的持续时间,//如果timed为false,当前会调用workQueue的take方法阻塞。//当有任务加入队列时,线程被唤醒,take方法返回任务并执行。可运行r=定时?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();如果(r!=null)返回r;超时=真;}catch(InterruptedException重试){timedOut=false;}}}有没有想过这里为什么要用take和poll?它们都是出列操作。这样做有什么好处?take&poll我们说take()方法会阻塞并挂起核心线程,使其不会占用过多的cpu资源,直到获取到Runnable再返回。如果“allowCoreThreadTimeOut”设置为true,那么核心线程会调用poll方法,因为poll可能返回null,所以当超时条件满足时,核心线程就会被销毁。非核心线程会workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),如果还没有得到超时,下一次循环判断“compareAndDecrementWorkerCount”会返回null,Worker的run()方法循环体的判断object为null,任务结束,线程被系统回收。回头看看runWorker()是不是设计得很巧妙。结论这部分的内容不是很容易理解。想继续讨论的同学可以继续阅读它的源码。理解这部分内容就好了。其实我们可以从源码中看到大量的线程状态检查。代码非常健壮,可以从中学习。