线程池是日常开发中常用的技术,使用起来非常简单,但是要用好线程池却不是一件容易的事。开发者需要不断探索底层实现原理,才能在不同的场景下选择合适的策略,最大限度发挥线程池的作用,避免踩坑。1、线程池工作流程下面是Java线程池的工作流程,涉及创建线程的参数和拒绝策略。如果读者对这部分了解不多,可以参考其他文档,本文不再赘述。二、高级线程池1、线程池的创建需要通过ThreadPoolExecutor手动创建。用户要非常清楚业务场景,自定义线程池,避免误操作可能导致的问题。以下是阿里巴巴Java开发手册中的描述:ThreadFactory:建议在Guava中使用ThreadFactoryBuilder创建:newThreadFactoryBuilder().setNameFormat("name-%d").build();2.线程池中阻塞队列的使用很多同学看到阻塞队列,自然会想到入口队列和出口队列都是阻塞的。使用的阻塞队列不需要关心拒绝策略。事实上,阻塞队列在任务提交和任务获取阶段采用了不同的策略。任务提交阶段:调用阻塞队列的offer方法。此方法是非阻塞的。如果插入队列失败,则直接返回false,触发拒绝策略;任务获取阶段:使用take方法,是阻塞的;3、保证提交阶段任务不丢失的三种方式:使用CallerRunsPolicy拒绝策略,自定义拒绝策略,使用MQ系统保证任务不丢失。(1)CallerRunsPolicy拒绝策略ThreadPoolExecutor.CallerRunsPolicy:这是提交任务的线程要处理的最简单的策略,但需要注意的是,如果任务耗时较长,会阻塞提交任务的线程,这可能成为系统瓶颈。(2)自定义拒绝策略由于Java线程默认使用offer提交任务,我们可以自定义拒绝策略,在任务提交失败时将其改为put阻塞提交。缺点是会阻塞提交线程,但是和CallerRunsPolicy策略相比,可以发挥多线程的优势。RejectedExecutionHandlerexecutionHandler=(r,executor)->{try{executor.getQueue().put(r);}catch(InterruptedExceptione){Thread.currentThread().interrupt();thrownewRejectedExecutionException("Producerthreadinterrupted",e);}};(3)配合MQ使用默认的ThreadPoolExecutor.AbortPolicy策略,保证任务不丢失。如果抛出RejectedExecutionException,则返回MQ消费失败,MQ保证自动重试。4.确保队列和未完成的任务不丢失。当服务停止时,队列中未完成的任务和线程池中的活跃线程可能会导致数据丢失。首先我们得出一个结论:无论采用什么策略,在Java层都不能100%保证不会丢失,比如机器突然断电。我们还是可以采取一定的措施,尽量避免任务丢失。(1)线程池关闭线程池关闭有两种方法:shutdownNow方法:线程池拒绝接受新提交的任务,同时立即关闭线程池,线程池中的任务不再执行,并抛出InterruptedException。关闭方法:线程池拒绝接受新提交的任务,等待线程池中的任务执行完再关闭线程池。(2)注册关闭钩子使用如下方法注册JVM进程关闭钩子,在钩子方法中执行线程池关闭、未处理和已完成任务的持久化存储等。runtime.getRuntime().addShutdownHook()需要注意的是,使用kill-9杀死进程时不会执行hook方法。杀死进程的一般方法是先执行kill,等待一段时间。如果进程还没有被杀死,再执行kill-9。为保证队列中的任务不丢失,需要消费队列中的数据并发送到外部MQ;为保证未完成的任务不丢失,需要保证抛出InterruptedException后任务参数传给MQ;需要注意的是:1)尽量不要将未完成的任务保存到本地磁盘,尤其是在频繁扩缩容的弹性集群中;2)捕捉到InterruptedException异常后,不要进行重试等耗时操作;3)所有需要监控的任务发送给MQ调整kill-9强制执行前的等待时间。(3)使用MQ保证任务必须执行。通过上面介绍的两种方法,可以处理大部分正常停止服务和丢失数据的任务。但是在极端情况下,比如停电、断网等,严格保证任务不丢失的场景仍然无法满足业务需求。这种情况下,就需要依赖MQ了。解决方案是使用线程池的submit方法提交任务,通过future获取任务执行完成,再返回给MQ消费完成。MQ中如何保证数据不丢失是另外一个比较复杂的话题,这里不做深入讨论。需要注意的是,如果采用这种方案,需要保证处理任务的幂等性,当操作步骤较多时,复杂度较高。5.ThreadLocal变量ThreadLocal中的变量作用域是当前线程。使用线程池后,由于跨线程,无法传输数据。如果业务中使用了ThreadLocal,这个场景还需要额外处理。(1)InheritableThreadLocalInheritableThreadLocal是在父子线程中自动传递参数,在线程池场景中不适用。(2)在提交任务前手动处理ThreadLocal中的值,在线程池执行时将其设置为线程池中线程的ThreadLocal,并在finally中清理数据。缺点是每个线程池都要重新处理。如果您不熟悉上下文,则存在泄漏的风险。(3)TransmittableThreadLocal阿里开源地址:TransmittableThreadLocal的原理是通过javaagent自动处理ThreadLocal跨线程池参数,业务开发者感知不到,也是推荐的方案。6、异常处理(1)异常感知execute方法:抛出异常会被提交任务线程感知;submit方法:抛出异常不会被提交任务线程感知到,但是在执行Future.get()时会感知到;(2)统一方案一:将catch统一在线程池执行逻辑最外层的异步任务中,将try和catch包裹起来,处理所有的异常。缺点是:1)不同的任务都需要trycatch,增加了代码量。2)没有checkedexception的地方,需要trycatched,代码难看。(3)统一处理方案二:重写统一异常处理方法该方案常见的实现有两种:1)自定义线程池,继承ThreadPoolExecutor,重写其afterExecute方法;2)创建线程池时自定义ThreadFactory,在实现中手动创建线程池并调用Thread.setUncaughtExceptionHandler注册统一的异常处理器。(4)统一方案3:Future任务使用submit提交,在Future.get()期间捕获所有异常。3.小结本文从创建线程池、队列注意事项、如何保证任务不丢失、ThreadLocal、异常等方面总结了作者的一些思考,读者可以对比一下自己的使用场景,看看是不是都提到了问题在这篇文章中被认为。就到这里了,或者大家有什么使用线程池的经验,欢迎交流分享。本文链接:Java线程池进阶作者简介:穆晓峰,美团Java技术专家,专注于分享软件开发实践和架构思维。欢迎关注公众号:Java研发更多精彩文章:从MVC到DDD的架构演进平台构建思路浅谈构建回滚应用和在线checklist实践Maven依赖冲突排查经验
