前言说到java线程池,大家最熟悉的就是ExecutorService接口了。jdk1.5新增的java.util.concurrent包下的API大大简化了多线程代码的开发。不管你用的是FixedThreadPool还是CachedThreadPool,背后的实现都是ThreadPoolExecutor。ThreadPoolExecutor是典型的缓存池化设计的产物。因为pool是有大小的,当pool的体积不足以承载时,就涉及到拒绝策略。JDK中预设了四种线程池拒绝策略。下面结合场景详细说一下这些策略的使用场景,以及我们可以扩展哪些拒绝策略。水池设计思维水池设计不应该是一个新名词。我们常见的如java线程池、jdbc连接池、redis连接池等都是这类设计的代表实现。这种设计一开始会预设资源,要解决的问题是抵消每次资源的消耗,比如创建线程的开销,获取远程连接的开销等等。就像你去食堂做饭,做饭的阿姨会先放好几份饭菜,你来的时候拿着饭盒加菜就可以了,这样就不用了边填饭边做菜,效率高。池化设计除了初始化资源外,还包括以下特征:池的初始值、池的活跃值、池的最大值等,这些特征可以直接映射到java线程池的成员属性上和数据库连接池。线程池触发拒绝策略的时机与数据源连接池不同。线程池除了池的初始大小和最大值外,还有一个额外的阻塞队列用于缓冲。当数据源连接池请求的连接数超过连接池的最大值时,会触发拒绝策略。策略一般是阻塞等待设定的时间或者直接抛出异常。线程池的触发时机如下图所示:如图所示,如果想知道线程池什么时候触发拒绝,需要明确以上三个参数的具体含义,分别是这三个参数整体协调的结果,而不是简单地超过最大线程数就会触发线程粗略地拒绝。当提交的任务数大于corePoolSize时,会优先放入队列缓冲区。只有当缓冲区被填满后,才会判断当前运行的任务是否大于maxPoolSize,小于则创建新的线程进行处理。当大于时,触发拒绝策略。总结就是:当当前提交的任务数大于(maxPoolSize+queueCapacity)时,会触发线程池的拒绝策略。JDK内置4种线程池拒绝策略拒绝策略接口定义在分析JDK自带的线程池拒绝策略之前,先来看一下JDK定义的拒绝策略接口,如下:}接口定义很明确,当触发拒绝策略时,线程池会调用你设置的具体策略,将当前提交的任务和线程池实例本身传递给你处理。如何处理,不同的场景会有不同的考虑。下面我们看下JDK我们内置了哪些实现:CallerRunsPolicy(调用者运行策略)publicstaticclassCallerRunsPolicyimplementsRejectedExecutionHandler{publicCallerRunsPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){if(!e.isShutdown()){r.run();}}}功能:当触发拒绝策略时,只要线程池没有关闭,就会由当前提交任务的线程处理。使用场景:一般用于不允许失败,性能要求不高,并发量小的场景,因为一般情况下线程池是不会关闭的,即提交的任务肯定会运行,但是因为是调用者线程自己执行的。当多次提交任务时,后续任务的执行就会被阻塞,性能和效率自然会变慢。AbortPolicy(中止策略)publicstaticclassAbortPolicyimplementsRejectedExecutionHandler{publicAbortPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){thrownewRejectedExecutionException("Task"+r.toString()+"rejectedfrom"+e.toString());}},直接抛出异常拒绝执行。abort策略是指打断当前的执行过程。使用场景:这个没有特殊的场景,但是正确处理抛出的异常是一回事。ThreadPoolExecutor中的默认策略是AbortPolicy。ThreadPoolExecutor系列的ExecutorService接口没有显示拒绝策略,所以默认就是这个。但是需要注意的是,ExecutorService中的线程池实例队列是无界的,也就是说内存爆了不会触发拒绝策略。当你自己自定义线程池实例时,在使用该策略时必须处理策略触发时抛出的异常,因为它会打断当前的执行过程。DiscardPolicy(丢弃策略)publicstaticclassDiscardPolicyimplementsRejectedExecutionHandler{publicDiscardPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){}}功能:直接静默丢弃这个任务,不触发任何动作使用场景:如果你提交的任务无关紧要,你可以使用它。因为它只是一个空的实现,它会默默地吞噬你的任务。所以这个policy基本不用DiscardOldestPolicy(放弃旧的policy)e.exe(r);}}}功能:如果线程池没有关闭,弹出队头元素,然后尝试执行使用场景:该策略仍然会丢弃任务,它discarding时会静默,但特点是discarded是旧的未执行的任务,是优先级高的待执行任务。基于这个特性,我能想到的场景就是发布消息和修改消息。消息发布时,尚未执行。这时,更新的消息又来了。此时未执行消息的版本高于当前提交的消息,如果版本较低,则可以丢弃。因为队列中可能有消息版本较低的消息会被排队执行,所以在实际处理消息的时候需要比较消息的版本。第三方实现的拒绝策略dubbo中的线程拒绝策略publicclassAbortPolicyWithReporttextendsThreadPoolExecutor.AbortPolicy{protectedstaticfinalLoggerlogger=LoggerFactory.getLogger(AbortPolicyWithReport.class);privatefinalStringthreadName;privatefinalURLurl;privatestaticvolatilelonglastPrintTime=0;privatestaticSemaphoreguard=newSemaphorePublicSemaphore(1);{this.threadName=threadName;this.url=url;}@OverridepublicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){Stringmsg=String.format("ThreadpoolisEXHAUSTED!"+"ThreadName:%s,PoolSize:%d(active:%d,core:%d,max:%d,largest:%d),Task:%d(completed:%d),"+"Executorstatus:(isShutdown:%s,isTerminated:%s,isTerminating:%s),in%s://%s:%d!",threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(),e.getCompletedTaskCount()、e.isShutdown()、e.isTerminated()、e.isTerminating(),url.getProtocol(),url.getIp(),url.getPort());logger.warn(msg);dumpJStack();thrownewRejectedExecutionException(msg);}privatevoiddumpJStack(){//省略实现}}可以看出,dubboworker线程在触发线程拒绝时,主要做了三件事。原理是尽量让用户知道触发线程拒绝策略的真正原因,并输出警告级别的日志。日志内容是线程池。详细设置参数,还有当前线程池的状态,以及当前拒绝任务的一些详细信息。可以说,这个日志,用过dubbo,有过生产运维经验的人,或多或少都看过。这个日志简直就是日志打印的一个模型,其他的日志打印模型还有spring。得益于如此详细的日志,很容易定位问题并输出当前线程栈的详细信息。这非常有用。当你通过上面的日志信息无法定位问题时,案发现场的dump线程上下文信息就是你找到的。问题的救星。继续抛出异常拒绝执行,使得这个任务失败。这继承了JDK默认拒绝策略的特点。Netty中的线程池拒绝策略thrownewRejectedExecutionException("Failedtostartanewthread",e);}}}Netty中的实现很像JDK中的CallerRunsPolicy,就是舍不得丢弃任务。不同之处在于,CallerRunsPolicy是直接在调用者线程上执行的任务。而Netty会创建一个新的线程来处理它。因此,Netty的实现相比于调用者执行策略的使用,可以扩展到支持高效率、高性能的场景。但是也要注意,在Netty的实现中,创建线程的时候是没有判断约束的,也就是说,只要系统还有资源,就会创建新的线程来处理,直到new不产生新的线程。抛出创建线程失败的异常。activeMq中的线程池拒绝策略newRejectedExecutionHandler(){@OverridepublicvoidrejectedExecution(finalRunnabler,finalThreadPoolExecutorexecutor){try{executor.getQueue().offer(r,60,TimeUnit.SECONDS);}catch(InterruptedExceptione){thrownewRejectedExecutionException("InterruptedwaitingforBrokerService.worker");}thrownewRejectedExecutionException("TimedOutwhileattempttingtoenqueueTask.");}});activeMq中的策略属于best-effort执行任务类型。当触发拒绝策略时,尝试在一分钟内重新安装任务,当,当分钟还没成功时成功成功成功,就
