场景回顾今天回宿舍,小叶唱道:快删我~,吓得赶紧查清楚情况,我以为我有找女朋友麻烦,原来今天又被面试官虐了,应该删掉面试官。面试官:我看到简历上写着你精通并发编程,那么线程池在你的日常工作中肯定会用到吧?你通常在什么场景下使用它?小叶:嗯,一般都是用线程池。我一般用在爬虫场景。通过线程池可以并行处理多个网络请求,可以提高系统的吞吐量。面试官:嗯,线程池在爬虫场景中用的很普遍,那么线程池是怎么创建的呢?小叶:我一般使用JDK自带的工厂方法提供的直接创建线程池的方法。我想不出具体的名字。面试官:你说的是Executors类。一般我们会使用单线程线程池、固定线程线程池等,那么它到底是如何通过参数创建不同线程池的呢?小野:emm……你说的是ThreadPoolExecutor,它好像是由核心线程数、最大线程数、空闲时间长度、空闲时间单位、任务队列等7个参数组成的,线程工厂和拒绝策略。面试官:嗯,看来你对线程池还是比较了解的。如果我设置核心线程数为10,最大线程数为20,任务队列为无界队列,那么我一次有30个任务。那么当前线程池的线程数是多少,任务会不会被拒绝呢?小叶:这个简单。最大线程数为20,一次可处理30个任务。当然线程数是20。因为任务队列是无界的,任务不会被拒绝。面试官:嗯,如果我把任务队列改成一个20的队列,我最多可以接收多少个请求?小叶:emm……应该是核心线程数+最大线程数+任务队列大小=50。面试官:今天的面试就这些了。通过小叶惨惨的面试不难发现,线程池是面试官在面试时经常问到的问题。线程池的几个简单的参数可能不是我们预期的结果。而我们在平时的编码过程中为了方便可能会直接调用Executors类提供的便捷方法,但是可能会存在一些需要我们考虑的问题。在Executors类提供的线程池方法中,所有的任务队列都是无界的。这可能会导致内存泄漏。而如果我们想采用其他的任务拒绝策略,又该如何实现呢?带着这些疑问,下面小张将带大家由浅入深地分析源码。整体结构从上图我们可以看出ThreadPoolExecutor的整体继承关系。这里可以看出,设计线程池的作者按照职责分离设计了两个接口,通过模板方法添加了钩子函数。ExecutorinterfacepublicinterfaceExecutor{//任务如何执行取决于子类是如何实现的关闭();//立即停止线程池,并返回未执行的任务ListshutdownNow();//是否停止状态booleanisShutdown();//是否终止booleanisTerminated();//等待线程池停止booleanawaitTermination(longtimeout,TimeUnitunit)throwsInterruptedException;//提交一个有返回结果的任务,返回一个代理对象Futuresubmit(Callabletask);//提交任务不返回,任务执行成功返回resultFuturesubmit(Runnabletask,Tresult);//提交任务不返回Future>submit(Runnabletask);//执行完任务集合中的所有任务,并返回结果List>invokeAll(Collection>tasks)throwsInterruptedException;//在上一个方法的基础上增加超时时间List>invokeAll(Collection>tasks,longtimeout,TimeUnitunit)throwsInterruptedException;//在任务中的任何任务执行后返回TinvokeAny(Collection>tasks)throwsInterruptedException,ExecutionException;//添加了一个超时TinvokeAny(Collection>tasks,longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}ExecutorService描述了一个线程池应该有关闭线程等功能池、提交任务和任务执行策略。接下来,我们将使用AbstractExecutorService抽象类,看看线程池是如何通过模板方法来减少代码的。从类的结构图中可以看出,抽象类实现了submit、invokeAll、invokeAny方法。submitpublicFuture>submit(Runnabletask){if(task==null)thrownewNullPointerException();}//newTaskFor这个方法这里就不多说了,可以理解为包装任务RunnableFutureftask=newTaskFor(task,null);//调用Executor类中的execute(ftask)方法;returnftask;}从源码可以看出,这个方法包装了task,直接调用了execute方法。invokeAllpublicList>invokeAll(Collection>tasks,longtimeout,TimeUnitunit)throwsInterruptedException{if(tasks==null)thrownewNullPointerException();}//计算超时longnanos=unit.toNanos(timeout);//集合ArrayList>futures=newArrayList>(tasks.size());布尔完成=假;try{for(Callablet:tasks)futures.add(newTaskFor(t));//计算任务执行期限finallongdeadline=System.nanoTime()+nanos;finalintsize=futures.size();//循环执行集中的任务for(inti=0;if=futures.get(我);//获取任务的返回值proxy,判断是否完成if(!f.isDone()){//如果已经到了deadline,返回future集合if(nanos<=0L)returnfutures;try{//阻塞并等待未来返回f.get(nanos,TimeUnit.NANOSECONDS);}catch(CancellationExceptionignore){}catch(ExecutionExceptionignore){}catch(TimeoutExceptiontoe){//如果超时则返回futurecollectionreturnfutures;}//更新剩余时间nanos=deadline-System.nanoTime();}}//代码运行到这里表示所有任务都已经执行完毕,并且设置了完成标志完成=真;回报期货;}finally{/**如果done为false,则取消所有正在进行的任务。只有在提交任务或者获取某个执行结果的过程中出现超时,done才会为false。**/if(!done)for(inti=0,size=futures.size();iTdoInvokeAny(Collection>tasks,booleantimed,longnanos)throwsInterruptedException,ExecutionException,TimeoutException{//空检查if(tasks==null)thrownewNullPointerException();intntasks=tasks.size();如果(ntasks==0)抛出新的IllegalArgumentException();ArrayList>futures=newArrayList>(ntasks);/**这个类不是本文的重点。具体源码在JUC包中分析阅读。这里将概述这个类提供了哪些功能这里把线程池实例传给这个类,目的是可以调用线程池的execute方法,不需要自己维护池。该类实现了submit提交任务和返回furure,内部维护了一个阻塞队列。当任务执行成功后,future对象会被放入阻塞队列。元素**/ExecutorCompletionServiceecs=newExecutorCompletionService(this);try{//记录异常ExecutionExceptionee=null;//如果启用超时,则计算超时时间finallongdeadline=timed?System.nanoTime()+nanos:0L;迭代器>it=tasks.iterator();//添加第一个任务futures.add(ecs.submit(it.next()));//任务数量-1--ntasks;积极主动=1;for(;;){//非阻塞访问元素Futuref=ecs.poll();if(f==null){//如果还有任务可以执行,继续提交任务,活跃数加1if(ntasks>0){--ntasks;futures.add(ecs.submit(it.next()));++活跃;}//当活跃数为0时退出循环,如果活跃数为0,future.get()抛出异常时会执行到这里elseif(active==0)break;elseif(timed){//如果开启超时,则等待获取响应队列中的元素,超过超时会报错。f=ecs.poll(nanos,TimeUnit.NANOSECONDS);如果(f==null)抛出新的TimeoutException();nanos=deadline-System.nanoTime();}else//如果没有开启超时,则会进入无限等待块以获取响应f=ecs.take();}//如果响应队列不为空,说明此时已经完成了一个任务if(f!=null){//活跃数-1--active;/**通过future获取响应结果,如果成功则直接返回,如果结果失败则记录异常,继续为本进程从响应队列中遍历下一次成功的执行结果**/try{返回f.get();}catch(ExecutionExceptioneex){ee=eex;}catch(RuntimeExceptionrex){ee=newExecutionException(rex);}}}//这里的Execution是指所有的任务都执行完了,所有的future.get都失败了,会抛出异常if(ee==null)ee=newExecutionException();扔ee;}finally{//最后,所有任务将被执行并取消for(inti=0,size=futures.size();i