当前位置: 首页 > 科技观察

关于使用CompletableFuture过程中的线程等待问题

时间:2023-03-14 13:40:55 科技观察

在电商应用场景中,通过异步多线程获取服务端信息是很常见的。例如用户打开个人中心查看个人综合信息,可能会显示用户的账户余额、优惠券、积分、消费红包等信息,然后服务器会通过异步线程汇总需要的信息并返回它给用户。如果单线程一个一个的返回个人信息,用户的等待时间显然是无法接受的,而通过异步多线程的方式大大减少了请求的响应时间。jdk8简化了异步任务的编写,提供了很多异步任务的计算方法。1、示例(先做一个简单的测试示例代码,查看运行结果)util.List;importjava.util.Random;importjava.util.concurrent.CompletableFuture;@Slf4jpublicclassCompletableFutureApplicationTests{publicstaticvoidmain(String[]args){log.info("currentcpucorenumber:{}",Runtime.getRuntime().availableProcessors());即时启动=Instant.now();List>list=newArrayList<>();//模拟任务需要10次调用for(inti=0;i<10;i++){list.add(CompletableFuture.supplyAsync(()->info()));}list.forEach(e->{try{log.info("{}",e.get());}catch(Exceptionex){ex.printStackTrace();}});即时结束=Instant.now();log.info("执行任务完成,总耗时:{}ms",Duration.between(start,end).toMillis());}publicstaticIntegerinfo(){//模拟任务时间try{Thread.sleep(1000);log.info("{}",Thread.currentThread().getName());}最后{returnnewRandom().nextInt(1000);}}}执行此程序,运行结果如下:09:20:49.603[main]INFO*-当前cpu核数:409:20:50.672[ForkJoinPool.commonPool-worker-1]INFO*-ForkJoinPool.commonPool-worker-109:20:50.672[main]信息*-75009:20:50.673[ForkJoinPool.commonPool-worker-2]信息*-ForkJoinPool.commonPool-worker-209:20:50.673[ForkJoinPool.commonPool-worker-3]信息*-ForkJoinPool.commonPool-worker-309:20:50.673[main]INFO*-94109:20:50.673[main]INFO*-48009:20:51.672[ForkJoinPool.commonPool-worker-1]INFO*-ForkJoinPool.commonPool-worker-109:20:51.672[main]INFO*-93909:20:51.673[ForkJoinPool.commonPool-worker-2]INFO*-ForkJoinPool.commonPool-worker-209:20:51.673[ForkJoinPool.commonPool-worker-3]信息*-ForkJoinPool.commonPool-worker-309:20:51.673[主要]信息*-72209:20:51.673[主要]信息*-78109:20:52.673[ForkJoinPool.commonPool-worker-1]信息*-ForkJoinPool.commonPool-worker-109:20:52.673[主要]信息*-86809:20:52.674[ForkJoinPool.commonPool-worker-2]信息*-ForkJoinPool.commonPool-worker-209:20:52.674[主要]信息*-63209:20:52.674[ForkJoinPool.commonPool-worker-3]信息*-ForkJoinPool.commonPool-worker-309:20:52.678[main]信息*-47109:20:53.673[ForkJoinPool.commonPool-worker-1]信息*-ForkJoinPool.commonPool-worker-109:20:53.673[main]]INFO*-49809:20:53.687[main]INFO*-执行任务完成,总耗时:4066ms2,结果分析(dumpthreadwaiting),可以看到是运行在4核的机器上,其运行结果超过4秒,如果作为异步线程运行,应该需要1秒多一点才符合预期。从打印的日志信息可以看出,主线程在50515253秒返回,子线程ForkJoinPool.commonPool的最大-worker数量为3,只有3个子线程循环执行任务,说明当同时调用10个任务时会发生线程等待,导致意外结果。跟踪源方法分析。查看supplyAsync方法源码:publicstaticCompletableFuturesupplyAsync(Suppliersupplier){returnasyncSupplyStage(asyncPool,supplier);}这里主要查看asyncPool参数,貌似是异步线程池。privatestaticfinalExecutorasyncPool=useCommonPool?ForkJoinPool.commonPool():newThreadPerTaskExecutor();看到是变量,根据useCommonPool判断创建方式,继续看useCommonPool参数。privatestaticfinalbooleanuseCommonPool=(ForkJoinPool.getCommonPoolParallelism()>1);继续在方法getCommonPoolParallelism中查看commonParallelism的值,从静态代码段获取(只贴出关键部分)。common=java.security.AccessController.doPrivileged(newjava.security.PrivilegedAction(){publicForkJoinPoolrun(){returnmakeCommonPool();}});intpar=common.config&SMASK;//即使线程被禁用也报告1commonParallelism=par>0?标准杆:1;通过位操作获取par的值,继续跟踪参数common.config,然后关注makeCommonPool方法(只贴出关键部分)。int并行度=-1;try{//忽略访问/解析属性时的异常Stringpp=System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");...if(parallelism<0&&//default1lessthan#cores(parallelism=Runtime.getRuntime().availableProcessors()-1)<=0)parallelism=1;如果(并行度>MAX_CAP)并行度=MAX_CAP;参数parallelism初始赋值为-1,可以通过系统参数java.util.concurrent.ForkJoinPool.common.parallelism改变并发数,同时增加对并发数范围的控制并发性。需要特别注意if条件下parallelism的赋值。核数大于1后为false。例子:如果cpu只有1核,可以通过Runtime.getRuntime().availableProcessors()获取cpu核数,并发计算数只有1,按照上面的程序,需要10秒1核环境下(可在vmware1核环境安装系统上创建运行查看结果),若cpu核数大于1,则并发数为cpu核数减1。3.配置系统参数,设置并发数为10,增加启动参数-Djava.util.concurrent.ForkJoinPool。common.parallelism=10运行结果可以观察到可以并行启动10个线程执行任务,总耗时在1秒范围内;如果改为9个并发线程,则在2秒范围内。4.使用线程模式查看方法CompletableFuture.supplyAsync。可以传入参数Exec??utor。如果在线程池中只有一个线程的情况下传入一个线程,预期结果将在10秒内。代码如下,验证结果。publicstaticvoidmain(String[]args){log.info("当前cpu核心数:{}",Runtime.getRuntime().availableProcessors());即时启动=Instant.now();List>list=newArrayList<>();ExecutorService执行器=Executors.newFixedThreadPool(1);//模拟任务需要10次调用for(inti=0;i<10;i++){list.add(CompletableFuture.supplyAsync(()->info(),executor));}list.forEach(e->{try{log.info("{}",e.get());}catch(Exceptionex){ex.printStackTrace();}});即时结束=Instant.now();log.info("执行任务完成,总耗时:{}ms",Duration.between(start,end).toMillis());执行人。关闭();}调整线程数为10,即1秒内可以执行完毕。五、CompletableFuture常用方法基本方法名说明CompletableFuture.runAsync无返回值运行任务CompletableFuture.suppliAsync有返回值运行任务CompletableFuture.allOf(...).get()多个任务全部执行在继续处理CompletableFuture.anyOf(…).get()多个任务中的任意一个执行完之后,继续处理CompletableFuture.thenApply接收执行结果。如果有返回值,可以将返回值向下传递给CompletableFuture.thenAccept接收执行结果。没有返回值6.总结不同的场景需要使用不同的线程方式。对于cpu密集型的任务,建议直接使用cpu的核心数来创建线程,避免频繁的线程切换导致线程等待。对于io密集型任务,建议使用线程。一般io等待时间远远超过线程等待时间。

最新推荐
猜你喜欢