当前位置: 首页 > 科技观察

新手都能看懂的SpringBoot异步编程指南

时间:2023-03-17 11:14:44 科技观察

通过这篇文章,你可以学到以下知识点:容量代表;ThreadPoolTask??Executor饱和策略;SpringBoot异步编程实践,理解代码的执行逻辑。Future模式异步编程在处理耗时操作和多任务场景时非常有用。我们可以更好的让我们的系统利用机器的CPU和内存,提高它们的利用率。有许多多线程设计模式。Future模式是多线程开发中非常常见的设计模式。本文也是基于这个模式来说明SpringBoot的异步编程知识。在实战之前,先简单介绍一下Future模式的核心思想!。Future模式的核心思想是异步调用。当我们执行一个方法时,如果这个方法中有多个耗时的任务需要同时完成,又不急于等待结果,可以让客户端立即返回,然后慢慢返回在后台计算任务。当然你也可以选择等这些任务执行完再返回客户端。这在Java中是很好支持的,后面我会在示例程序中详细比较这两种方式的区别。SpringBoot异步编程实践如果我们需要在SpringBoot中实现异步编程,Spring提供的两个注解会让这件事情变得非常简单。@EnableAsync:通过在配置类或Main类中添加@EnableAsync来启用对异步方法的支持。@Async可以作用于一个类也可以作用于一个方法,作用于一个类就是这个类的所有方法都是异步方法。1.自定义TaskExecutor很多人对TaskExecutor不是很了解,所以我们先花一点篇幅来介绍一下这个东西。从名字就可以看出是任务的执行者。它带领并执行线程来处理任务,就像指挥官一样,而我们的线程就像一支军队。这些军队可以异步攻击敌人👊。Spring提供了TaskExecutor接口作为任务执行器的抽象,与java.util.concurrent包下的Executor接口非常相似。略有不同的TaskExecutor接口使用Java8语法@FunctionalInterface来声明此接口是功能接口。org.springframework.core.task.TaskExecutor@FunctionalInterfacepublicinterfaceTaskExecutorextendsExecutor{voidexecute(Runnablevar1);}如果没有自定义Executor,Spring会创建一个SimpleAsyncTaskExecutor并使用它。importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.scheduling.annotation.AsyncConfigurer;importorg.springframework.scheduling.annotation.EnableAsync;importorg.springframework.readscheduling.concurrent。importjava.util.concurrent.Executor;/**@authorshuang.kou*/@Configuration@EnableAsyncpublicclassAsyncConfigimplementsAsyncConfigurer{privatestaticfinalintCORE_POOL_SIZE=6;privatestaticfinalintMAX_POOL_SIZE=10;privatestaticfinalintQUEUE_CAPACITY=100;@BeanpublicExecutortaskExecutor(){//Spring默认庞泛心是装载1.最大线程容量是无限的,队列容量也是无限的。ThreadPoolTask??Executorexecutor=newThreadPoolTask??Executor();//核心线程数executor.setCorePoolSize(CORE_POOL_SIZE);//最大线程数executor.setMaxPoolSize(MAX_POOL_SIZE);//队列大小executor.setQueueCapacity(QUEUE_CAPACITY);//当最大池为full,此策略保证不会丢失任何任务请求,但可能会影响整体应用程序性能。executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());executor.setThreadNamePrefix("MyThreadPoolTask??Executor-");executor.initialize();returnexecutor;}}ThreadPoolTask??Executor常用概念:CorePoolSize:核心线程数定义最小线程数即可以同时运行的线程数。QueueCapacity:当有新任务到来时,会先判断当前运行的线程数是否达到核心线程数。如果是这样,信任将存储在队列中。MaximumPoolSize:当队列中存储的任务达到队列容量时,当前可以同时运行的线程数成为最大线程数。一般情况下,队列大小不会设置为:Integer.MAX_VALUE,核心线程数和最大线程数也不会设置为相同大小。在这种情况下,最大线程数的设置是没有意义的,无法确定当前的CPU。以及内存利用率的具体情况。如果队列满了,当前同时运行的线程数达到了最大线程数,如果有新的任务来了会怎样?Spring默认使用ThreadPoolExecutor.AbortPolicy。默认情况下,在Spring中,ThreadPoolExecutor会抛出RejectedExecutionException来拒绝传入的任务,这意味着你将失去对这个任务的处理。对于可扩展的应用程序,建议使用ThreadPoolExecutor.CallerRunsPolicy。当最大池已满时,此策略为我们提供了一个可扩展的队列。ThreadPoolTask??Executor饱和策略定义:如果当前同时运行的线程数达到最大线程数,ThreadPoolTask??Executor定义了一些策略:ThreadPoolExecutor.AbortPolicy:抛出RejectedExecutionException拒绝新任务的处理。ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务。你不会任务请求。但是这种策略会降低提交新任务的速度,影响程序的整体性能。此外,此策略喜欢增加队列容量。如果您的应用程序可以容忍此延迟并且您不能任务丢弃任何任务请求,则可以选择此策略。ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃。ThreadPoolExecutor.DiscardOldestPolicy:该策略将丢弃最早的未完成任务请求。2、写一个异步方法,模拟一个查找电影对应字符开头的方法。我们给这个方法加上了@Async注解,告诉Spring这是一个异步方法。另外,这个方法CompletableFuture.completedFuture(results)的返回值意味着我们需要返回结果,也就是说程序必须完成任务才能返回给用户。请注意completableFutureTask方法中第一行打印log的代码,后面分析程序中会用到,非常重要!importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.scheduling.annotation.Async;importorg。springframework.stereotype.Service;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importjava.util.concurrent.CompletableFuture;importjava.util.stream.Collectors;/**@authorshuang.kou*/@ServicepublicclassAsyncService{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(AsyncService.class);privateListmovies=newArrayList<>(Arrays.asList("阿甘正传","泰坦尼克号","千与千寻","肖申克的救赎","疯狂动物城","告别","Joker","Crawl"));/**演示使用:查找以特定字符/字符串开头的电影*/@AsyncpublicCompletableFuture>completableFutureTask(Stringstart){//打印日志logger.warn(Thread.currentThread().getName()+"startthistask!");//查找以特定字符/字符串开头的电影Listresults=movies.stream().filter(movie->movie.startsWith(start)).collect(Collectors.toList());//模拟这是一个耗时任务try{Thread.sleep(1000L);}catch(InterruptedExceptione){e.printStackTrace();}//返回一个新的CompletableFuture,它已经完成了给定的值returnCompletableFuture.completedFuture(results);}}3.试写的异步方法/**@authorshuang.kou*/@RestController@RequestMapping("/async")publicclassAsyncController{@AutowiredAsyncServiceasyncService;@GetMapping("/movies")publicStringcompletableFutureTask()throwsExecutionException,InterruptedException{//开始时间longstart=System.currentTimeMillis();//开始执行大量异步任务Listwords=Arrays.asList("F","T","S","Z","J","C");List>>completableFutureList=words.stream().map(word->asyncService.completableFutureTask(word)).collect(Collectors.toList());//CompletableFuture.join()方法可以得到他们的结果并加入他们List>results=completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());//打印结果和运行程序所花费的时间System.out.println("Elapsedtime:"+(System.currentTimeMillis()-start));返回结果.toString();}}请求该接口时,控制台打印出如下内容:2019-10-0113:50:17.007WARN18793---[lTask??Executor-1]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-1startthistask!2019-10-0113:50:17.007WARN18793---[lTask??Executor-6]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-6startthistask!2019-10-0113:50:17.007WARN18793---[lTask??Executor-5]g.j.a.service.AsyncService:MyThreadPoolTask??Executor50:20start17.007WARN18793---[lTask??Executor-4]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-4startthistask!2019-10-0113:50:17.007WARN18793---[lTask??Executor-3]g.j.a.service.AsyncService:MyThreadskPoolTask??00task13:startthis0task1-350:17.007WARN18793---[lTask??Executor-2]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-2startthistask!Elapsedtime:1010首先我们可以看到处理完所有任务大约需要1s,这和我们自定义的关于ThreadPoolTask??Executor不一样,我们配置核心线程数为6,然后赋值6个任务通过模拟以下代码交给系统执行。这样每个线程都会被分配一个任务,每个任务的执行时间为1s,所以处理6个任务的总耗时为1s。Listwords=Arrays.asList("F","T","S","Z","J","C");List>>completableFutureList=words.stream().map(word->asyncService.completableFutureTask(word)).collect(Collectors.toList());大家可以自己验证一下,尝试将核心线程数改为3,再次请求这个接口,会发现处理所有任务的时间大约是2s。另外,从上面的运行结果可以看出,所有任务都执行完了才返回结果。这种情况对应的是我们需要将结果返回给客户端请求的情况,如果我们不需要将任务执行结果返回给客户端呢?比如我们向系统上传一个大文件,上传之后,只要大文件格式符合要求我们就上传成功了。一般情况下,我们需要等待文件上传完毕,然后再给用户返回消息,但是这样会很慢。如果使用异步,当用户上传时,会立即返回消息给用户,然后系统会静默处理上传任务。这样也会增加一点麻烦,因为文件可能会上传失败,所以系统还需要一种机制来弥补这个问题,比如上传遇到问题时发送消息通知用户。下面将演示客户端不需要返回结果的情况:将completableFutureTask方法改为void类型@AsyncpublicvoidcompletableFutureTask(Stringstart){...//这可能是系统对任务执行结果的处理,比如存储它在数据库中Wait...//doSomeThingWithResults(results);}Controller代码修改如下:@GetMapping("/movies")publicStringcompletableFutureTask()throwsExecutionException,InterruptedException{//Starttheclocklongstart=System.currentTimeMillis();//Kickofmultiple,asynchronouslookupsListwords=Arrays.asList("F","T","S","Z","J","C");words.stream().forEach(word->asyncService.completableFutureTask(word));//Waituntiltheyaarealldone//打印结果,包括elapsedtimeSystem.out.println("Elapsedtime:"+(System.currentTimeMillis()-start));return"Done";}请求这个接口,控制台打印出来以下内容:Elapsedtime:02019-10-0114:02:44.052WARN19051---[lTask??Executor-4]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-4startthistask!2019-10-0114:02:44.052WARN19051---[lTask??Executorr-3]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-3startthistask!2019-10-0114:02:44.052WARN19051---[lTask??Executor-2]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-2startthistask!2019-10-0114:02:5WAR---[lTask??Executor-1]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-1startthistask!2019-10-0114:02:44.052WARN19051---[lTask??Executor-6]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-6startthistask!2019-10:00244.052WARN19051---[lTask??Executor-5]g.j.a.service.AsyncService:MyThreadPoolTask??Executor-5startthistask!可以看到系统会直接将结果返回给用户,然后系统才会真正开始执行要做的任务Futurevs.CompletableFuture源码分析参考https://spring.io/guides/gs/async-方法/https://medium.com/trendyol-tech/spring-boot-async-executor-management-with-threadpooltaskexecutor-f493903617d