大家好,本文将介绍动态线程池框架(DynamicTp)的适配模块,在之前也有介绍文章,该模块主要用于适配部分第三方组件的线程池管理,使第三方组件内置的线程池也能享受到动态参数调整和监控告警等增强功能。DynamicTp项目地址目前有500多颗star,感谢大家的star,欢迎pr,业务后贡献开源gitee地址:https://gitee.com/yanhom/dynamic-tpgithub地址:https://github.com/lyh200/dynamic-tp系列文章美团动态线程池实践思路,开源动态线程池框架(DynamicTp),监控与源码分析篇适配器已连接组件适配器模块现已连接内置的三个WebServerSpringBoot(Tomcat、Jetty、Undertow的线程池管理)在实现层面也与核心模块解耦,利用spring的事件机制进行通知监听处理。可以看到,当两个监听器在配置中心监听配置变化时,我们项目内部线程池更新后会发出一个RefreshEvent事件。DtpWebRefreshListener监听事件后会更新对应的WebServer线程池参数。监控告警也是如此。当DtpMonitor中执行监控任务时,会释放一个CollectEvent事件。DtpWebCollectListener监听事件后,会收集对应WebServer的线程池指标数据。如果要管理第三方组件的线程池,首先要对这些组件有一定的了解,了解整个请求的一个处理过程,找到处理请求对应的线程池。这些线程池不一定是JUC包下的ThreadPoolExecutor。类,或者说是组件自己实现的线程池,但是基本原理是类似的。Tomcat、Jetty、Undertow都是这样。他们没有直接使用JUC提供的线程池,而是自己实现了一套,或者扩展了JUC的实现;翻遍源码找到对应的线程池后,再看看有没有暴露出来的公共方法供我们调用获取?如果没有,我们需要考虑通过反射来获取。Tomcat内部线程池的实现Tomcat内部线程池并没有直接使用JUC下的ThreadPoolExecutor,而是选择继承JUC下的Executor系统类,然后重写exe??cute()等方法。不同版本有差异。1.继承JUC的原生ThreadPoolExecutor(9.0.50及以下版本),并覆盖一些方法,主要是execute()和afterExecute()2.继承JUC的AbstractExecutorService(9.0.51及以上版本),代码基本是JUC的翻版Tomcat的ThreadPoolExecutor也相应地微调了execute()方法。注意Tomcat实现的线程池类名也叫ThreadPoolExecutor,和JUC下的一样。Tomcat的ThreadPoolExecutor类的execute()方法如下:publicvoidexecute(Runnablecommand,longtimeout,TimeUnitunit){submittedCount.incrementAndGet();尝试{super.execute(command);}catch(RejectedExecutionExceptionrx){if(super.getQueue()instanceofTaskQueue){finalTaskQueuequeue=(TaskQueue)super.getQueue();try{if(!queue.force(command,timeout,unit)){submittedCount.decrementAndGet();抛出新的RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));}}catch(InterruptedExceptionx){submittedCount.decrementAndGet();抛出新的RejectedExecutionException(x);}}else{submittedCount.decrementAndGet();抛出RX;}}}可以看出,他首先调用了父类的execute()方法,然后捕获了RejectedExecutionException异常,然后判断任务队列类型是否为TaskQueue,尝试将任务添加到任务队列中。如果添加失败,则证明队列已满,然后执行拒绝策略。这里submittedCount是一个原子变量。记录被提交到这个线程池但是还没有被执行。任务数(主要用在下面提到的TaskQueue队列的offer()方法中),为什么要这样设计呢?继续阅读!Tomcat定义了一个阻塞队列TaskQueue继承自LinkedBlockingQueue,主要重写了offer()方法@Overridepublicbooleanoffer(Runnableo){//如果(parent==null)returnsuper.offer(o),我们不能做任何检查;//我们已经用完了线程,只需将对象排队if(parent.getPoolSize()==parent.getMaximumPoolSize())returnsuper.offer(o);//我们有空闲线程,只需将其添加到队列中即可if(parent.getSubmittedCount()<=(parent.getPoolSize()))returnsuper.offer(o);//如果我们的线程少于创建新线程的最大强制if(parent.getPoolSize()
