当前位置: 首页 > 后端技术 > Java

线程池相关的一些知识

时间:2023-04-01 14:34:58 Java

基本上所有的程序员都直接或间接的使用过线程池。这是一把双刃剑。如果用得好,会事半功倍。如果使用不好,可能会直接毁掉服务器。所以我们需要对线程池有所了解,才能更好的使用它。当然主要是基于Java和SpringBoot。1线程池1.1为什么要使用线程池线程的创建和销毁是很耗时的,使用池化技术可以减少这些消耗。线程池一般维护几个核心线程等待任务执行,可以提高响应速度。同时,使用线程池也可以更方便的管理线程,比如控制线程数,监控执行状态。1.2如何创建线程池在Java的juc包中,提供了Executors类。一般情况下,为了使用方便,可以直接使用该类的静态方法来创建。在这些静态方法中,都是基于ThreadPoolExecutor类构建的。让我们来看看它们中的每一个。1.2.1ThreadPoolExecutor是最重要的线程池,我们需要非常熟悉它的参数和整体流程。关于源码分析,我们后面会单独用一篇博客来说明。我们来看一个参数最全的构造方法。publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler)参数说明:corePoolSize:核心线程数。即使空闲也保留在线程池中的线程数。除非设置了allowCoreThreadTimeOut,否则不会减少。您可以调用prestartCoreThread和prestartAllCoreThreads来预创建一个或所有核心线程。maximumPoolSize:最大线程数。线程池中允许的最大线程数。keepAliveTime,单位:keepAliveTime。当线程数大于核心线程数时,如果线程空闲时间大于这个值,就会被终止。workQueue:工作队列。阻塞队列,存放尚未执行的任务。ArrayBlockingQueue:基于数组的有界阻塞队列。LinkedBlockingQueue:基于链表的阻塞队列,吞吐量通常高于ArrayBlockingQueue。SynchronousQueue:不存储元素的阻塞队列,每次插入都必须等待另一个线程调用移除操作PriorityBlockingQueue:具有优先级的无限阻塞队列。threadFactory:线程工厂。用于生成线程。主要设置的是线程名称前缀,用于区分。该块一般使用Google提供的guava包中的com.google.common.util.concurrent.ThreadFactoryBuilder构建。处理程序:拒绝策略。当当前线程池达到最大处理能力时,执行拒绝策略。这实际上是标准的策略模式。CallerRunsPolicy:当前exector未关闭时,被拒绝的任务由调用线程执行。AbortPolicy:拒绝执行当前任务并抛出RejectedExecutionException。DiscardPolicy:什么都不做,直接丢弃当前任务。DiscardOldestPolicy:如果当前exector没有关闭,则丢弃队列中最早的任务。整体执行流程:1.2.2Executors内置静态方法FixedThreadPoolpublicstaticExecutorServicenewFixedThreadPool(intnThreads,ThreadFactorythreadFactory){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueueactory,thread})特点:核心线程数与最大线程数相同生存时间:0。因为核心线程数与最大线程数相同,所以该参数无意义。阻塞队列就是LinkedBlockingQueue,可以认为是一个无界队列。当线程池中的线程数达到最大线程数时,后续任务将被推入阻塞队列。可能会触发OOM异常。SingleThreadExecutorpublicstaticExecutorServicenewSingleThreadExecutor(ThreadFactorythreadFactory){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue(),threadFactory1)返回与实际几乎相同;}FinalizableDelegatedExecutorService的特性privatestaticclassFinalizableDelegatedExecutorServiceextendsDelegatedExecutorService{FinalizableDelegatedExecutorService(ExecutorServiceexecutor){super(executor);}@SuppressWarnings("deprecation")protectedvoidfinalize(){super.shutdown()内部方法activesuper.shutdown()被调用。也就是说,当executor被垃圾回收后,会主动关闭实际的executor,不过还是建议在不用的时候及时关闭,因为使用LinkedBlockingQueue可能会触发OOM异常。newCachedThreadPoolpublicstaticExecutorServicenewCachedThreadPool(ThreadFactorythreadFactory){returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue(),threadFactory);}特点:核心线程数为0,最大线程数∞(可以认为是无穷大)生存时间为60秒,阻塞队列为SynchronousQueue,容量为0。每个提交的任务都会立即得到一个线程去执行,可能会造成线程数量过多,从而触发OOM异常。ScheduledThreadPool,SingleThreadScheduledExecutorpublicstaticSc??heduledExecutorServicenewScheduledThreadPool(intcorePoolSize,ThreadFactorythreadFactory){returnnewScheduledThreadPoolExecutor(corePoolSize,threadFactory);}publicstaticSc??heduledExecutorServicenewSingleThreadScheduledExecutor(ThreadFactorythreadFactory){returnnewDelegatedScheduledExecutorService(newScheduledThreadPoolExecutor(1,threadFactory));}继承于ThreadPoolExecutor其功能得到了扩展,提交的任务可以延迟一定时间执行,也可以周期性执行。WorkStealingPoolpublicstaticExecutorServicenewWorkStealingPool(intparallelism){returnnewForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null,true);}在1.8中引入,使用ForkJoinPool作为实现。线程池维护足够的线程来支持给定的并行度,并且可以使用多个队列来减少争用。实际的线程数可以动态增加或减少。不保证提交任务的执行顺序。1.2.3线程池提交任务execute:常用的提交方法,没有返回值。submit:适合需要有返回值的任务提交。会返回一个Future对象,用于判断任务是否执行,并获取任务执行结果。1.5引入了Future,1.8引入了CompletableFuture。关于CompletableFuture我会在后面单独的一篇文章中讲到。1.2.4线程池的关闭shutdownNow:将当前线程池设置为STOP状态,然后停止所有当前正在执行和挂起的任务,并返回待执行任务列表。shutdown:将当前线程设置为SHUTDOWN状态,中断不在执行任务的线程,不再接受新的任务,但之前提交的所有任务都会被执行。1.3线程池的监控线程池本身提供了很多属性供获取:getPoolSize:线程池当前线程数getCorePoolSize:线程池核心线程数getActiveCount:执行任务的线程数状态可能会发生变化在计算过程中)getTaskCount:所有任务的大概总数getQueue:当前线程池的任务队列getLargestPoolSize:线程池中的最大线程数getMaximumPoolSize:线程池中允许的最大线程数getKeepAliveTime:线程池线程存活时间isShutdown:线程池是否关闭(SHUTDOWN状态)isTerminated:线程池是否处于TERMINATED状态。同时,线程池还提供了几种保护空的实现方法。可以重写beforeExecute(Threadt,Runnabler)、afterExecute(Runnabler,Throwablet)、terminated(),通过继承获取扩展点的线程池状态。1.4配合Prometheus暴露监控指标。本例中线程池的主要目的是提高并发性,而不是延迟任务。如果没有空闲线程执行任务,就会自己执行。整体逻辑非常简单。内部启用SingleThreadScheduledExecutor以定期收集线程池数据并通过io.micrometer.core.instrument.Metrics将其公开。@Slf4j@EnableAsync@ConfigurationpublicclassExecutorConfigimplementsInitializingBean,DisposableBean{publicstaticfinalStringTHREAD_POOL_NAME_FUTURE="futureExecutor";privatestaticfinalScheduledExecutorServicescheduledExecutor=Executors.newSingleThreadScheduledExecutor();privatestaticfinalIterableTAG=Collections.singletonList(Tag.of("thread.pool.name",THREAD_POOL_NAME_FUTURE));@Bean(name=THREAD_POOL_NAME_FUTURE)publicExecutorfutureExecutor(){returnnewThreadPoolExecutor(20,80,10,TimeUnit.MINUTES,newSynchronousQueue<>(),newThreadFactoryBuilder().setNameFormat(THREAD_POOL_NAME_FUTURE+"-%d").build(),newThreadPoolExecutor.CallerRunsPolicy());}@OverridepublicvoidafterPropertiesSet(){scheduledExecutor.sscheduleAtFixedRate(doCollect(),30L,5L,TimeUnit.SECONDS);}@Overridepublicvoiddestroy(){try{scheduledExecutor.shutdown();}catch(忽略异常){}}privateRunnabledoCollect(){return()->{try{ThreadPoolTask??Executorexec=(ThreadPoolTask??Executor)futureExecutor();Metrics.gauge("thread.pool.core.size",TAG,exec,ThreadPoolTask??Executor::getCorePoolSize);Metrics.gauge("thread.pool.max.size",TAG,exec,ThreadPoolTask??Executor::getMaxPoolSize);Metrics.gauge("thread.pool.keepalive.seconds",TAG,exec,ThreadPoolTask??Executor::getKeepAliveSeconds);//Metrics.gauge("thread.pool.active.size",TAG,exec,ThreadPoolTask??Executor::getActiveCount);Metrics.gauge("thread.pool.thread.count",TAG,exec,ThreadPoolTask??Executor::getPoolSize);//Metrics.gauge("thread.pool.queue.size",TAG,exec,e->e.getThreadPoolExecutor().getQueue().size());//Metrics.gauge("thread.pool.task.count",TAG,exec,e->e.getThreadPoolExecutor().getTaskCount());Metrics.gauge("thread.pool.task.completed.count",TAG,exec,e->e.getThreadPoolExecutor().getCompletedTaskCount());}catch(Exceptionex){log.warn("doCollectex=>{}",ex.getLocalizedMessage());}};}}1.5SpringBoot中的线程池在Spring项目中,@EnableAsync和@Async常用于实现异步任务。默认情况下,使用ThreadPoolTask??Executor。公共类ThreadPoolTask??ExecutorextendsExecutorConfigurationSupportimplementsAsyncListenableTaskExecutor,SchedulingTaskExecutor{privatefinalObjectpoolSizeMonitor=newObject();私人诠释corePoolSize=1;privateintmaxPoolSize=Integer.MAX_VALUE;私人intkeepAliveSeconds=60;privateintqueueCapacity=Integer.MAX_VALUE;私有布尔值allowCoreThreadTimeOut=false;privatebooleanprestartAllCoreThreads=false;默认的拒绝策略是ThreadPoolExecutor.AbortPolicy()。所以一般来说,我们都需要自定义线程池。2注意事项2.1阿里巴巴Java开发手册【强制】创建线程或线程池时请指定一个有意义的线程名,方便出错时回溯。这很容易理解。像http-nio-8103-exec-1,一看就知道是httpweb相关的线程【强制】线程资源必须通过线程池提供,不允许显式创建线程应用程序。参考线程池的好处【强制】线程池不允许使用Executors创建,只能通过ThreadPoolExecutor创建。这种处理方式可以让写的同学更加清楚线程池的运行规律,避免资源耗尽的风险。2.2线程池其他,核心线程数,最大线程数,生存时间,阻塞队列的选择和容量。没有固定的公式。一般情况下,一些CPU密集型任务会根据任务类型进行区分。一般线程数设置的比较小。如果线程太多,上下文切换可能会占用比较高的时间。IO密集型任务。在这种情况下,请设置尽可能多的线程。可以粗略估计请求等待时间(WT)和服务时间(ST)之间的比率。线程池大小设置为N*(1+WT/ST)。但还是做好指标监控,根据业务场景不断调整优化。初始值可以通过在测试环境中做一些压力测试来确定。3后记合理使用,事半功倍。时刻保持敬畏之心,防患于未然,做好指标监测。留下两个坑:[]ThreadPoolExecutor源码[]CompletableFuture相关echo'5Y6f5Yib5paH56ugOiDmjpjph5Eo5L2g5oCO5LmI5Zad5aW26Iy25ZWKWzkyMzI0NTQ5NzU1NTA4MF0pL+aAneWQpihscGUyMz4Qp'|base6