前言说到Java线程池,大家最熟悉的就是ExecutorService接口了。jdk1.5新加入的java.util.concurrent包下的这个API大大简化了多线程代码的开发。不管你用的是FixedThreadPool还是CachedThreadPool,背后的实现都是ThreadPoolExecutor。ThreadPoolExecutor是典型的缓存池化设计的产物。因为pool是有大小的,当pool的体积不足以承载时,就涉及到拒绝策略。JDK中预设了四种线程池拒绝策略。下面结合场景详细说一下这些策略的使用场景,以及我们可以扩展哪些拒绝策略。水池设计思维水池设计不应该是一个新名词。我们常见的如java线程池、jdbc连接池、redis连接池等都是这类设计的代表实现。这种设计一开始会预设资源,要解决的问题是抵消每次资源的消耗,比如创建线程的开销,获取远程连接的开销等等。就像你去食堂做饭,做饭的阿姨会先放好几份饭菜,你来的时候拿着饭盒加菜就可以了,这样就不用了边填饭边做菜,效率高。池化设计除了初始化资源外,还包括以下特征:池的初始值、池的活跃值、池的最大值等,这些特征可以直接映射到java线程池的成员属性上和数据库连接池。线程池触发拒绝策略的时机与数据源连接池不同。线程池除了池的初始大小和最大值外,还有一个额外的阻塞队列用于缓冲。当数据源连接池请求的连接数超过连接池的最大值时,会触发拒绝策略。策略一般是阻塞等待设定的时间或者直接抛出异常。如图所示,如果想知道线程池什么时候触发粗略拒绝,需要弄清楚上面三个参数的具体含义。是这三个参数整体协调的结果,而不是简单的超过最大线程数就会触发粗略的线程拒绝。当提交的任务数大于corePoolSize时,会优先放入队列缓冲区。只有当缓冲区被填满后,才会判断当前运行的任务是否大于maxPoolSize。如果小于maxPoolSize,则会创建一个新的线程进行处理。当大于时,触发拒绝策略。总结就是:当当前提交的任务数大于(maxPoolSize+queueCapacity)时,会触发线程池的拒绝策略。JDK内置4种线程池拒绝策略拒绝策略接口定义在分析JDK自带的线程池拒绝策略之前,先来看一下JDK定义的拒绝策略接口,如下:}接口定义很明确,当触发拒绝策略时,线程池会调用你设置的具体策略,将当前提交的任务和线程池实例本身传递给你处理。如何处理,不同的场景会有不同的考虑。下面我们看下JDK我们内置了哪些实现:CallerRunsPolicy(调用者运行策略)publicstaticclassCallerRunsPolicyimplementsRejectedExecutionHandler{publicCallerRunsPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){if(!e.isShutdown()){r.run();}}}功能:当触发拒绝策略时,只要线程池没有关闭,就会由当前提交任务的线程处理。使用场景:一般用于不允许失败,性能要求不高,并发量小的场景,因为一般情况下线程池是不会关闭的,即提交的任务肯定会运行,但是因为是调用者线程自己执行的。当多次提交任务时,后续任务的执行就会被阻塞,性能和效率自然会变慢。AbortPolicy(中止策略)publicstaticclassAbortPolicyimplementsRejectedExecutionHandler{publicAbortPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){thrownewRejectedExecutionException("Task"+r.toString()+"rejectedfrom"+e.toString():当拒绝策略函数时);}},直接抛出拒绝执行的异常。abort策略是指打断当前的执行过程。使用场景:这个没有特殊的场景,但是正确处理抛出的异常是一回事。ThreadPoolExecutor中的默认策略是AbortPolicy。ThreadPoolExecutor系列的ExecutorService接口没有显示拒绝策略,所以默认就是这个。但是需要注意的是,ExecutorService中的线程池实例队列是无界的,也就是说内存爆了不会触发拒绝策略。当你自己自定义线程池实例时,在使用该策略时必须处理策略触发时抛出的异常,因为它会打断当前的执行过程。DiscardPolicy(丢弃策略)publicstaticclassDiscardPolicyimplementsRejectedExecutionHandler{publicDiscardPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){}}功能:直接安静地丢弃这个任务,不触发任何动作使用场景:如果你提交的任务无关紧要,你可以使用它。因为它只是一个空的实现,它会默默地吞噬你的任务。所以这个policy基本不用DiscardOldestPolicy(放弃旧的policy)e.exe(r);}}}功能:如果线程池没有关闭,弹出队头元素,然后尝试执行使用场景:该策略仍然会丢弃任务,它discarding时会静默,但特点是discarded是旧的未执行的任务,是优先级高的待执行任务。基于这个特性,我能想到的场景就是发布消息和修改消息。消息发布时,尚未执行。这时,更新的消息又来了。此时未执行消息的版本高于当前提交的消息,如果版本较低,则可以丢弃。因为队列中可能会有消息版本较低的消息会被排队执行,所以在实际处理消息的时候需要做好消息版本的比较。第三方实现的拒绝策略dubbo中的线程拒绝策略publicclassAbortPolicyWithReportextendsThreadPoolExecutor.AbortPolicy{protectedstaticfinalLoggerlogger=LoggerFactory.getLogger(AbortPolicyWithReport.class);privatefinalStringthreadName;privatefinalURLurl;privatestaticvolatilelonglastPrintTime=0;privatestaticSemaphoreguard=newSemaphore(1);publicAbortPolicyWithReport(StringthreadName,URLurl){this.threadName=threadName;this.url=url;}@OverridepublicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){Stringmsg=String.format("ThreadpoolisEXHAUSTED!"+"ThreadName:%s,PoolSize:%d(active:%d,core:%d,max:%d,largest:%d),Task:%d(completed:%d),"+"Executorstatus:(isShutdown:%s,isTerminated:%s,isTerminating:%s),in%s://%s:%d!",threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(),e.getCompletedTaskCount()、e.isShutdown()、e.isTerminated()、e.isTerminating()、url.getProtocol()、url.getIp()、url。getPort());logger.warn(msg);dumpJStack();thrownewRejectedExecutionException(msg);}privatevoiddumpJStack(){//省略实现}}可以看到dubbo工作线程在触发线程拒绝时,主要是做三件事,原理是尽量让用户知道触发线程拒绝策略的真正原因1)输出警告级别的日志,日志内容是线程池的详细设置参数,线程池当前状态,以及当前拒绝任务的一些细节。可以说,这个日志,用过dubbo,有过生产运维经验的人,或多或少都看过。这个日志简单来说就是一个日志打印的模型,其他的日志打印模型还有spring。得益于如此详细的日志,很容易定位问题。2)输出当前线程栈的详细信息,很有用。当您无法通过上述日志信息定位问题时,案发现场的转储线程上下文信息就是您查找问题的救命稻草。3)不断抛出拒绝异常,使本次任务失败。该特性继承了JDK默认的拒绝策略。延伸阅读:Dubbo面试18道题,你能搞定吗?Netty中的线程池拒绝策略"Failedtostartanewthread",e);}}}Netty中的实现和JDK中的CallerRunsPolicy很相似,都是舍不得丢弃任务。不同的是CallerRunsPolicy是直接在调用者线程上执行的任务。而Netty创建了一个newthread来处理,因此相对于调用者执行策略的使用,Netty的实现可以扩展到支持高效、高性能的场景。但是也要注意,在Netty的实现中,创建线程的时候是没有判断约束的,也就是说,只要系统还有资源,就会创建新的线程来处理,直到new不产生新的线程。抛出创建线程失败的异常。推荐:什么是Netty?activeMq中的线程池拒绝策略newRejectedExecutionHandler(){@OverridepublicvoidrejectedExecution(finalRunnabler,finalThreadPoolExecutorexecutor){try{executor.getQueue().offer(r,60,TimeUnit.SECONDS);}catch(InterruptedExceptione){thrownewRejectedExecutionExecutionException("for"");}thrownewRejectedExecutionException("TimedOutwhileattemptedtoenqueueTask.");}});activeMq中的策略属于尽力而为执行任务类型,当触发拒绝策略时,任务会被重新标记到任务队列中一分钟,一分钟,一分钟超时还没成功成功成功;RejectedExecutionHandler[]handlerChain=chain.toArray(newRejectedExecutionHandler[0]);returnnewRejectedExecutionHandlerChain(handlerChain);}privateRejectedExecutionHandlerChain(RejectedEXecutionHandler[]HandlerChain){this.handlerChain=Objects.requirenonNull(HandlerChain,“HandlerChainMustNotBenull”);}@overridePubliCementPublReexteDexecution(runnabler,runnabler,runnabler,runnnabler,rundereextriiteexectectereecterexecterexterlerexterryextrelyexterryexterryextryreyre;拒绝策略的实现很有特点,与其他实现不同。它定义了一个拒绝策略链并包装了一个拒绝策略列表。当触发拒绝策略时,会依次执行策略链中的rejectedExecution。结语上一篇文章从线程池的设计思想和线程池触发拒绝策略的时机引出了java线程池拒绝策略接口的定义。辅以四种内置JDK和四种第三方开源软件的拒绝策略定义,描述了线程池拒绝策略实现的各种思路和使用场景。希望阅读本文后,您对java线程池拒绝策略有更深入的了解,能够根据不同的使用场景更加灵活的应用。
