当前位置: 首页 > 后端技术 > Java

ThreadPoolExecutor八种拒绝策略,是的,不是4种!

时间:2023-04-01 16:55:22 Java

送你以下java学习资料,文末有获取方法。api,大大简化了多线程代码的开发。不管你用的是FixedThreadPool还是CachedThreadPool,背后的实现都是ThreadPoolExecutor。ThreadPoolExecutor是典型的缓存池化设计的产物。因为pool是有大小的,当pool的体积不足以承载时,就涉及到拒绝策略。JDK中预设了4种线程池拒绝策略。下面结合场景详细说一下这些策略的使用场景,以及我们可以扩展哪些拒绝策略。水池设计思维水池设计不应该是一个新名词。我们常见的如Java线程池、JDBC连接池、Redis连接池等都是这类设计的代表实现。这种设计一开始会预设资源,要解决的问题是抵消每次资源的消耗,比如创建线程的开销,获取远程连接的开销等等。就像你去食堂做饭,做饭的阿姨会先放好几份饭菜,你来的时候拿着饭盒加菜就可以了,这样就不用了边填饭边做菜,效率高。池化设计除了初始化资源外,还包括以下特征:池的初始值、池的活跃值、池的最大值等,这些特征可以直接映射到Java线程池的成员属性上和数据库连接池。线程池触发拒绝策略的时机与数据源连接池不同。线程池除了池的初始大小和最大值外,还有一个额外的阻塞队列用于缓冲。当数据源连接池请求的连接数超过连接池的最大值时,会触发拒绝策略。策略一般是阻塞等待设定的时间或者直接抛出异常。线程池的触发时机如下图所示:如图所示,如果想知道线程池什么时候触发拒绝,需要明确以上三个参数的具体含义,分别是这三个参数整体协调的结果,而不是简单地超过最大线程数就会触发线程粗略地拒绝。当提交的任务数大于corePoolSize时,会优先放入队列缓冲区。只有当缓冲区被填满后,才会判断当前运行的任务是否大于maxPoolSize。如果较小,则会创建一个新线程进行处理。当大于时,触发拒绝策略。总结就是:当当前提交的任务数大于(maxPoolSize+queueCapacity)时,会触发线程池的拒绝策略。JDK内置了4种线程池拒绝策略。拒绝策略接口定义在分析JDK自带的线程池拒绝策略之前,先看一下JDK定义的拒绝策略接口,如下:1publicinterfaceRejectedExecutionHandler{2voidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor);3}接口定义非常明确。当触发拒绝策略时,线程池会调用你设置的具体策略,将当前提交的任务和线程池实例本身传递给你处理。在不同的情况下,特定处理将有所不同。Runnabler,ThreadPoolExecutore){6if(!e.isShutdown()){7r.run();8}9}10}功能:当触发拒绝策略时,只要线程池没有关闭,就会由当前提交任务的线程处理。使用场景:一般用于不允许失败,性能要求不高,并发量小的场景,因为一般情况下线程池是不会关闭的,即提交的任务肯定会运行,但是因为是调用者线程自己执行的。当多次提交任务时,后续任务的执行就会被阻塞,性能和效率自然会变慢。(a(Apropolicy)1(a(1)公共静态类Apropolicy工具拒绝ExecutionHandler{23Publicabortpolicy(){}45公共voidrecultedExecution(runnabler,threadpoolexecutore,threadpoolexecutore)“被拒绝”+8这是一个特殊的情况,但是有一件事是正确处理抛出的例外。threadpoolexecutor中的默认策略是abortpolicy,而threadpoolexecutor系列executorServiceInterfacesinterfacesinterfacesinterfacesinterfacesinterfacesinsterfaceseriesseriveserivenotedisplay拒绝策略则是这样。.但是请注意,ExecutorService中的线程池实例队列是无界的,也就是说内存爆了不会触发拒绝策略,自己自定义线程池实例时,必须处理抛出的异常使用该策略时会触发该策略,因为它会中断当前的执行过程。DiscardPolicy(丢弃策略)1????public?static?class?DiscardPolicy?implements?RejectedExecutionHandler?{23????????public?DiscardPolicy()?{?}45????????public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?e)?{6????????}7????}功能:直接静悄悄的丢弃这个任务,不触发任何动作Usecase:Youcanuseitifthetaskyousubmitisirrelevant.Becauseit'sjustanemptyimplementation,itwillsilentlydevouryourtasks.所以这个策略基本上不用了DiscardOldestPolicy(弃老策略)1????public?static?class?DiscardOldestPolicy?implements?RejectedExecutionHandler?{23????????public?DiscardOldestPolicy()?{?}45????????public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?e)?{6????????????if?(!e.isShutdown()){7e.getQueue().poll();8e.execute(r);9}10}11}Function:Ifthethreadpoolisnotclosed,poptheelementattheheadofthequeue,andthentrytoexecutetheusagescenario:Thisstrategywillstilldiscardtasks,anditwillbesilentwhendiscarding,butthecharacteristicisthatthediscardedtasksareoldandunexecutedtasks,andtheyaretaskswithhigherprioritytobeexecuted.Basedonthisfeature,thescenarioIcanthinkofispublishingamessageandmodifyingamessage.Whenthemessageispublished,ithasnotyetbeenexecuted.Atthistime,theupdatedmessagecomesagain.Atthistime,theversionoftheunexecutedmessageishigherthanthecurrentsubmittedmessageIftheversionislower,itcanbediscarded.因为队列中可能有消息版本较低的消息会被排队等待执行,所以在实际处理消息时,需要将消息的版本与第三方实现的拒绝策略进行比较。Dubbo1publicclassAbortPolicyWithReport中线程拒绝策略扩展ThreadPoolExecutor。AbortPolicy{23protectedstaticfinalLoggerlogger=LoggerFactory.getLogger(AbortPolicyWithReport.class);45privatefinalStringthreadName;67私人最终URL网址;89privatestaticvolatilelonglastPrintTime=0;1011privatestatic信号量守卫=newSemaphore(1);1213publicAbortPolicyWithReport(StringthreadName,URLurl){14this.threadName=threadName;151urlurl;151url@Override19publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){20Stringmsg=String.format("线程池已用尽!"+21"线程名称:%s,池大小:%d(活动:%d,核心:%d,最大值:%d,最大:%d),任务:%d(已完成:%d),“+22”执行器状态:(isShutdown:%s,isTerminated:%s,isTerminating:%s),在%s://%s:%d!",23threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),24e.getTaskCount(),e.getCompletedTaskCount(),e.isShutdown(),e.isTerminated(),e.isTerminating(),25url.getProtocol(),url.getIp(),url.getPort());26logger.warn(msg);27dumpJStack();28thrownewRejectedExecutionException(msg);29}3031privatevoiddumpJStack(){32//实现省略原理是让用户知道触发线程拒绝策略的真正原因并输出警告级别的日志,日志的内容是线程池的详细设置参数,线程池当前的状态,当前拒绝的任务,一些详细的信息,可以说这个日志看多了有过生产运维经验的人在使用dubbo的时候还是比较少的,这个日志简单来说就是一个日志打印的模型,其他的日志打印模型还有spring。得益于如此详细的日志,很容易定位问题并输出当前线程栈的详细信息。这非常有用。当你通过上面的日志信息无法定位问题时,案发现场的dump线程上下文信息就是你找到的。问题的救命稻草,这个可以参考《dubbo线程池耗尽事件-"CyclicBarrier惹的祸"》继续抛出拒绝异常,让这个任务失败,这个继承了JDK默认拒绝策略ThreadpoolrejectionpolicyinNetty1privatestaticfinal的特性类NewThreadRunsPolicy实现RejectedExecutionHandler{2NewThreadRunsPolicy(){3super();4}56publicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){7尝试{8finalThreadt=newThread(r,"临时任务执行器");9t.start();10}Catch(ThrowableE){11ThrowNewRejeCTExEcutionException(12"FAILEDtoStartaNewThread",E);13}14}15}JDK中jdk的实现是勉强的。不同之处在于,CallerRunsPolicy是直接在调用者线程上执行的任务。而Netty会创建一个新的线程来处理它。因此,Netty的实现相比于调用者执行策略的使用,可以扩展到支持高效率、高性能的场景。但是也要注意,在Netty的实现中,创建线程的时候没有做判断约束,也就是说只要系统还有资源,就会创建新的线程来处理,直到new不产生新的线程为止。1个新拒绝ExeCutionHandler(){2@Override3公共voidrecupledExecution(最终运行r,最终threadpoolexecutorexecutor){4尝试{5executor.getQueue(r,r,60)。,TimeUnit.SECONDS);6}catch(InterruptedExceptione){7抛出新的RejectedExecutionException(“中断等待BrokerService.worker”);8}910抛出新的RejectedExecutionException(“尝试排队任务时超时。”);11}12});activeMq中的策略属于best-effort任务执行类型。当触发拒绝策略时,任务将在一分钟内重新塞入任务队列。当一分钟超时失败时,将抛出异常PinPoint1publicclassRejectedExecutionHandlerChainimplementsRejectedExecutionHandler{2privatefinalRejectedExecutionHandler[]handlerChain;中的线程池拒绝策略;34公共静态拒绝ectedExecutionHandlerbuild(Listchain){5Objects.requireNonNull(chain,“handlerChain不得为null”);6RejectedExecutionHandler[]handlerChain=chain.toArray(newRejectedExecutionHandler[0]);7返回newRejectedExecutionHandlerChain(handlerChain);8????}910????private?RejectedExecutionHandlerChain(RejectedExecutionHandler[]?handlerChain)?{11????????this.handlerChain?=?Objects.requireNonNull(handlerChain,?"handlerChain?must?not?be?null");12????}1314????@Override15????public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?executor)?{16for(RejectedExecutionHandlerrejectedExecutionHandler:handlerChain){17rejectedExecutionHandler.rejectedExecution(r,executor);18}19}20}Pinpoint的拒绝策略实现很有特点,它定义了拒绝策略链,这与其他实现不同。拒绝策略列表。当一个拒绝策略被触发时,策略链中的rejectedExecution会被一一执行。结语上一篇文章从线程池的设计思想和线程池触发拒绝策略的时机引出了java线程池拒绝策略接口的定义。辅以四种内置JDK和四种第三方开源软件的拒绝策略定义,描述了线程池拒绝策略实现的各种思路和使用场景。希望阅读本文后,您对java线程池拒绝策略有更深入的了解,能够根据不同的使用场景更加灵活的应用。