本文转载自微信公众号《Java极客技术》,作者鸭血范。转载本文请联系Java极客技术公众号。为什么要使用线程池你是否有这样的疑惑,为什么要使用线程池?也许你会说,我可以复用创建的线程;线程是一个重量级的对象,为了避免频繁的创建和销毁,最好使用线程池来管理。没问题,大家都很明白~不过使用线程池还有一个很重要的点:可以控制并发数。如果并发数过多,消耗的资源会增加,直接让服务器宕机。肯定不行。绕不开的几个参数提到ThreadPoolExecutor,那你的小脑袋肯定会想到那几个参数,我们来看看源码(我直接把7个参数的方法放上去):publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler)我们分别来看:corePoolSize:核心线程数,线程池中有两种线程,核心线程和非核心线程。线程池中的核心线程,即使什么都不做,也会一直在线程池中,除非设置了allowCoreThreadTimeOut参数。maximumPoolSize:线程池可以创建的最大线程数。这个值=核心线程数+非核心线程数keepAliveTime&unit:线程池可以撤销线程,那么什么时候撤销呢?如果一个线程在一段时间内没有执行任务,则说明该线程处于空闲状态。可以撤销吗?所以,如果一个线程不是核心线程,在keepAliveTime&unit期间一直没有工作,那么很抱歉,你只能离开核心线程了。它很空闲,不会从线程池中清除。没办法让它成为核心线程~workQueue:工作队列,这个队列维护了几个等待执行的Runnable任务对象常用的队列:LinkedBlockingQueue,ArrayBlockingQueue,SynchronousQueue,DelayQueue大厂编码规范,相信大家都知道,不推荐使用Executors,最重要的原因是:Executors提供的很多方法默认使用无界LinkedBlockingQueue,在高负载情况下,无界队列很容易造成OOM,OOM会导致所有请求无法处理,所以在使用的时候,强烈建议使用有界队列,因为如果使用有界队列,当线程数过多时,会遵循拒绝策略threadFactory:Afactoryforcreatingthreads,用于创建线程分批。如果未指定,将创建一个默认的线程工厂处理程序:拒绝处理策略。workQueue中提到,如果使用有界队列,那么当线程数大于最大线程数时,拒绝处理策略就会起作用。常用的处理策略有四种:-AbortPolicy:默认拒绝策略会丢弃任务并抛出RejectedExecutionException-CallerRunsPolicy:提交任务的线程自己执行任务-DiscardOldestPolicy:直接丢弃新任务,没有异常thrown-DiscardOldestPolicy:丢弃最旧的任务,然后添加新任务工作队列中默认的拒绝策略是AbortPolicy,会抛出RejectedExecutionException,但这是运行时异常,对于运行时异常,编译器不会强制捕获,所以更容易忽略错误。因此,如果线程池处理的任务很重要,尽量自定义自己的拒绝策略。线程池的几种状态在源码中可以清楚的看到。线程池有5种状态:privatestaticfinalintRUNNING=-1<=SHUTDOWN时,不接受新任务,直接返回false//如果rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()也不接受新任务,返回falseif(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))returnfalse;for(;;){intwc=workerCountOf(c);//wc>=CAPACITY表示线程数不够,所以返回falsecreated是一个核心线程,然后判断wc是否大于核心线程数,如果大于则返回false//如果core为false,说明要创建的线程是一个non-core线程,然后判断wc是否大于最大线程数,如果(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))returnfalse;//CAS操作增加workerCount的值,如果成功则跳出循环if(compareAndIncrementWorkerCount(c))breakretry;c=ctl.get();//重新读取ctl//判断线程池状态是否存在变化,如果有变化,则重试=false;booleanworkerAdded=false;Workerw=null;try{//创建一个worker对象w=newWorker(firstTask);//实例化一个Thread对象finalThreadt=w.thread;if(t!=null){//下一个操作需要锁定finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{//Recheckwhileholdinglock.//BackoutonThreadFactoryfailureorif//shutdownbeforelockacquired.intrs=runStateOf(ctl.get());if(rslargestPoolSize)largestPoolSize=s;workerAdded=true;}}finally{mainLock.unlock();}if(workerAdded){//启动任务线程,开始执行任务t.start();workerStarted=true;}}}finally{if(!workerStarted)//如果任务线程启动失败,调用addWorkerFailed//addWorkerFailed方法主要做了两件事:从线程池中移除线程;reduceworkerCount的值1addWorkerFailed(w);}returnworkerStarted;}Worker类在addWorker中,主要由Worker类做一些相应的处理,worker继承AQS,实现Runnable接口,线程池维护HashSet,一个HashSetprivatefinalHashSet由worker组成objectsworkers=newHashSet();worker继承AQS主要是利用AQS独占锁机制来识别线程是否空闲;另外,worker还实现了Runnable接口,所以它本身就是一个线程任务,而且是一个Thread,线程的任务就是自己的thisthread=getThreadFactory().newThread(this);咱们瞅瞅里面的源码:privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/***Thisclasswillneverbeserialized,butweprovidea*serialVersionUIDtosuppressajavacwarning.*/privatestaticfinallongserialVersionUID=6138294804551838833L;//处理任务的线程finalThreadthread;//worker传入的TaskRunnablefirstTask;/**Per-threadtaskcounter*/volatilelongcompletedTasks;/***CreateswithgivenfirsttaskandthreadfromThreadFactory.*@paramfirstTaskthefirsttask(nullifnone)*/Worker(RunnablefirstTask){//设置state为-1,避免worker在执行前被打断setState(-1);//inhibitinterruptsuntilrunWorkerthis.firstTask=firstTask;//创建线程执行任务this.thread=getThreadFactory().newThread(this);}/**DelegatesmainrunlooptoouterrunWorker*/publicvoidrun(){runWorker(this);}//Lockmethods////Thevalue0representstheunlockedstate.//Thevalue1representsthelockedstate.protectedbooleanisHeldExclusively(){returngetState()!=0;}protectedbooleantryAcquire(intunused){if(compareAndSetState(0,1))){setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}protectedbooleantryRelease(intunused){setExclusiveOwnerThread(null);setState(0);returntrue;}publicvoidlock(){acquire(1);}publicbooleantryLock(){returntryAcquire(1);}publicvoidunlock(){release(1);}publicbooleanisLocked(){returnisHeldExclusively();}voidinterruptIfStarted(){Threadt;if(getState()>=0&&(t=thread)!=null&&!t.isInterrupted()){try{t.interrupt();}catch(SecurityExceptionignore){}}}}runWorkerworker类在执行run方法时,实际上是调用了runWorker方法();Runnabletask=w.firstTask;w.firstTask=null;//允许中断w.unlock();//allowinterruptsbooleancompletedAbruptly=true;try{//判断任务是否为空,不为空则直接执行//如果task为Empty,调用getTask()方法,从workQueue中取出一个新的task执行while(task!=null||(task=getTask())!=null){//加锁防止被打断经过otherthreadsw.lock();//如果池正在停止,确保线程被中断;//如果没有,确保线程不被中断。这//需要在第二种情况下检查以处理with//shutdownNowracewhileclearinginterrupt//查看线程池的状态,如果线程池处于停止状态,则需要中断当前线程if((runStateAtLeast(ctl.get(),STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())wt.interrupt();try{//执行beforeExecutebeforeExecute(wt,task);Throwablethrown=null;try{//执行任务task.run();}catch(RuntimeExceptionx){thrown=x;throwx;}catch(Errorx){thrown=x;throwx;}catch(Throwablex){thrown=x;thrownewError(x);}finally{//执行afterExecute方法afterExecute(task,thrown);}}finally{//设置task为null,循环运行task=null;w.completedTasks++;//释放锁w.unlock();}}completedAbruptly=false;}finally{processWorkerExit(w,completedAbruptly);}}在runWorker方法中,会先执行创建worker时存在的任务。任务执行完后,worker不会被销毁,但是在while循环中,worker会不断调用getTask方法从阻塞队列中获取任务,然后调用taskrun()执行任务,从而达到目的多路复用线程。通过循环条件while(task!=null||(task=getTask())!=null)可以看出,只要getTask方法的返回值不为null,就会继续循环,而线程将继续执行。从而达到线程复用getTask的目的下面看一下getTask方法的实现:;intrs=runStateOf(c);//Checkifqueueemptyonlyifnecessary.if(rs>=SHUTDOWN&&(rs>=STOP||workQueue.isEmpty())){decrementWorkerCount();returnnull;}intwc=workerCountOf(c);//Areworkerssubjecttoculling?//allowCoreThreadTimeOut变量默认为false,即核心线程即使空闲也不会销毁。//如果为true,则核心线程在keepAliveTime内处于空闲状态,将被销毁。booleantimed=allowCoreThreadTimeOut||wc>corePoolSize;//如果线程运行数大于最大线程数,但缓存队列为空。此时减少worker数量//如果有设置允许线程超时或者线程数超过核心线程数,线程在指定时间内没有轮询到任务,队列为空,这时候worker的个数也减少了}try{//如果timed为true,会调用workQueue的poll方法//超时时间为keepAliveTime,如果超过keepAliveTime的长度,poll会返回null//如果返回为null,在runWorker//while(task!=null||(task=getTask())!=null)循环条件被打破,从而跳出循环,此时线程执行完成//iftimed为false(allowCoreThreadTimeOut为false,wc>corePoolSize为false)//workQueue的take方法会被调用阻塞到当前//当队列中有任务时,线程被唤醒,take方法返回任务,并开始执行Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!=null)returnr;timedOut=true;}catch(InterruptedExceptionretry){timedOut=false;}}}这里的源码分析差不多就清楚了主线程的复用体现在runWorker方法中的while循环中,在while循环中,worker会不断的调用getTask方法,而在getTask方法中,如果有任务队列中没有任务,如果此时线程是核心线程,会一直卡在workQueuetake方法中,会被阻塞并suspended,此时不会占用CPU资源,直到获取到任务返回true。这时runWorker拿到任务继续执行任务,从而实现了线程复用。没想到一不小心看完了