当前位置: 首页 > 后端技术 > Java

使用线程池

时间:2023-04-01 14:48:52 Java

线程池的处理流程,通过ThreadPoolExecutor创建线程池的方法如下:publicThreadPoolExecutor(intcorePoolSize,//核心线程数intmaximumPoolSize,//最大线程池数longkeepAliveTime,//线程生存时间TimeUnitunit,//时间单位BlockingQueuerunnableTaskQueue,//任务队列ThreadFactorythreadFactory,//线程工厂RejectedExecutionHandlerhandler//拒绝策略);参数详解corePoolSize:可以同时运行的最小线程数。maximumPoolSize:线程池允许创建的最大线程数。keepAliveTime:除核心线程外的其他线程空闲的最长时间,超过就会销毁。单位:时间单位。runnableTaskQueue:一个阻塞队列,用于存放等待执行的任务。ArrayBlockingQueue:基于数组结构的有界阻塞队列,按照先进先出的原则对元素进行排序。LinkedBlockingQueue:基于链表结构的阻塞队列。该队列按照先进先出的原则对元素进行排序,其吞吐量通常高于ArrayBlockingQueue。SynchronousQueue:不存储元素的阻塞队列。每次插入操作必须等到另一个线程调用移除操作,否则插入操作一直阻塞,吞吐量通常高于Linked-BlockingQueue。PriorityBlockingQueue:具有优先级的无限阻塞队列。threadFactory:线程工厂,用于创建新的线程。handler:当线程池饱和时,处理新任务的策略。AbortPolicy:直接抛出异常。CallerRunsPolicy:只使用调用者的线程来运行任务。DiscardOldestPolicy:丢弃队列中最新的任务,执行当前任务。DiscardPolicy:不处理,丢弃。如何合理设置参数线程池的合理配置需要从任务的角度来分析:任务的性质:对于CPU密集型任务,核心线程数应该设置为Ncpu+1。对于IO密集型任务,核心线程数应设置为2^Ncpu。任务优先级:考虑使用优先级队列PriorityBlockingQueue进行处理。任务的执行时间:可以使用不同大小的线程池或者优先级队列来对短任务进行优先处理。任务依赖:如果依赖数据库连接,由于线程提交SQL需要等待数据库返回结果,所以线程数应该设置的大一些,这样可以更好的减少CPU等待时间,更好的利用CPU.向线程池提交任务提交不需要返回值的任务:execute()方法用于提交不需要返回值的任务,因此无法判断线程是否成功执行任务水池。提交需要返回值的任务:submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,返回值可以通过future的get()方法获取。get()方法将阻塞当前线程,直到任务完成。关闭线程池可以通过调用线程池的shutdown方法来关闭线程池。原理是遍历线程池中的工作线程,然后一个一个调用线程的interrupt方法来中断线程,这样无法响应中断的任务可能永远不会终止。使用场景实例后台接收Kafka中的数据写入数据库。分析kafka队列中的数据。只有当前游标指向的数据消费完了,才能消费下一个。监听方法会被反复调用,所以可以采用多线程来加速消费。同时考虑到线程提交SQL需要等待数据库响应,所以将多线程的核心线程数设置为2的CPU数次方。伪代码#ymlthread:corePoolSize:16maximumPoolSize:16epAliveTime:3pacity:200@Congiguration@ConfigurationProperties(prefix="thead")@DatapublicclassThreadConfig{privateIntegercorePoolSize;私有整数maximumPoolSize;私人长保持活动时间;私有整数容量;}公共类ExecutorConfig{@Bean(name="executor")publicThreadPoolExecutorexecutor(ThreadConfigconfig){returnnewThreadPoolExecutor(config.getCorePoolSize(),config.getMaximumPoolSize(),config.getKeepAliveTime(),TimeUnit.SECONDS,新的LinkedBlockingDeque<>(config.getCapacity()),新的ThreadPoolExecutor.CallerRunsPolicy());}}@AutowriedprivateThreadPoolExecutor执行器;@kafaListener(topics="",groupId="",containerFactory="")publicvoidaddData(List&g吨;records){//使用CountDownLatch控制多线程intcountDownLatchSize=6;CountDownLatchcountDownLatch=newCountDownLatch(countDownLatchSize);//提取数据ListdtoList=parseToDTO(records);for(inti=0;i{//拆分数据并添加到数据库service.saveBath(dtoList.subList(start,end));});}catch(Exceptione){log.error("");}最后{countDownLatch.countDown();}}countDownLatch.await();}效果单线程向ClickHouse插入100万条数据耗时27分钟,多线程向ClickHouse插入100万条数据耗时33秒。