当前位置: 首页 > 后端技术 > Java

一劳永逸的优化!并发RPC调用小工具

时间:2023-04-01 19:20:34 Java

前言系统性能优化是每个程序员的必经之路,但也可能是最深的套路。不仅需要对各种工具有深入的了解,有时还需要结合具体业务场景定制优化方案。当然你也可以悄悄在代码里隐藏一个Thread.sleep,在需要优化的时候少睡几毫秒(手动狗头)。性能优化这个话题范围很广,目前市面上还没有一本可以全面概括这个话题的优质书籍。不仅如此,即便深入到各个细分领域,性能优化的手段也非常丰富,让人眼花缭乱。本文不会涵盖所有的优化套路,只是针对近期项目开发过程中遇到的并发调用场景,给出自己的通用解决方案。可以直接打包,也可以复制粘贴到工程中使用。也欢迎大家多提意见,优化场景。背景不知道大家在开发过程中有没有遇到过这样的场景。我们会先调用服务A,再调用服务B,组装数据,再调用服务C(如果你在微服务系统开发中没有遇到过这样的场景,我想说要么你系统的拆分粒度太粗了,或者这是一个幸运的底层系统,没有下游服务依赖~)这个环节的耗时是duration(A)+duration(B)+duration(C)+其他操作。从经验来看,大部分时间消耗来自于下游服务和网络IO的处理时间,应用内部CPU运算的时间消耗基本可以忽略不计。但是,当我们知道对服务A和B的调用之??间没有依赖关系时,是否可以通过同时并发调用A和B来减少同步调用的等待时间,这样理想情况下,耗时环节就可以得到优化tomax(duration(A),duration(B))+duration(C)+其他操作再举个例子,有时候我们可能需要批量调用下游服务,比如批量查询用户信息。下游查询接口为了业务保护,往往会限制一次查询的数量,比如一次只能查询一百条用户信息。因此,我们需要拆分多个请求进行多次查询,这样耗时就变成了n*duration(A)+其他操作。同样,采用并发请求的优化方式,理想情况下,耗时可以减少到max(duration(A))+其他操作。这两种场景的代码实现基本类似。本文将提供第二个场景的思路和完整实现。小规模并发RPC调用的整体实现类图如下:首先,我们需要创建一个线程池,用于并发执行。因为程序中通常还有其他场景使用线程池,而我们希望RPC调用可以使用单独的线程池,所以这里使用工厂方法进行封装。@ConfigurationpublicclassThreadPoolExecutorFactory{@ResourceprivateMapexecutorMap;/***默认线程池*/@Bean(name=ThreadPoolName.DEFAULT_EXECUTOR)publicAsyncTaskExecutorbaseExecutorService(){//后续支持自定义各个服务的一些参数ThreadPoolTask??ExecutortaskExecutor=newThreadPoolTask??Executor();//设置线程池参数信息taskExecutor.setCorePoolSize(10);taskExecutor.setMaxPoolSize(50);taskExecutor.setQueueCapacity(200);taskExecutor.setKeepAliveSeconds(60);.DEFAULT_EXECUTOR+"--");taskExecutor.setWaitForTasksToCompleteOnShutdown(true);taskExecutor.setAwaitTerminationSeconds(60);taskExecutor.setDaemon(Boolean.TRUE);//修改拒绝策略,使用当前线程执行taskExecutor.setRejectedExecutionHandler(newThreadPoolRunec());//初始化线程池taskExecutor.initialize();r返回任务执行器;}/***并发调用一个单独的线程池*/@Bean(name=ThreadPoolName.RPC_EXECUTOR)publicAsyncTaskExecutorrpcExecutorService(){//后续支持对每个服务自定义这部分参数ThreadPoolTask??ExecutortaskExecutor=newThreadPoolTask??Executor();//设置线程池参数信息taskExecutor.setCorePoolSize(20);taskExecutor.setMaxPoolSize(100);taskExecutor.setQueueCapacity(200);taskExecutor.setKeepAliveSeconds(60);taskExecutor.setThreadNamePrefix(ThreadPoolName.RPC_EXECUTOR+"--");.setWaitForTasksToCompleteOnShutdown(true);taskExecutor.setAwaitTerminationSeconds(60);taskExecutor.setDaemon(Boolean.TRUE);//修改拒绝策略使用当前线程执行taskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());//初始化线程池taskExecutor.initialize();返回任务执行器;}/***根据线程池名称获取线程池*如果找不到对应的线程池会抛出异常*@paramname线程池名称*@returnthreadpool*@throwsRuntimeException如果找不到这个名字的线程池*/if(executor==null){thrownewRuntimeException("noexecutorname"+name);}返回执行者;}}publicclassThreadPoolName{/***默认线程池*/publicstaticfinalStringDEFAULT_EXECUTOR="defaultExecutor";/***并发调用使用的线程池*/publicstaticfinalStringRPC_EXECUTOR="rpcExecutor";}如代码所示,我们声明了两个Spring线程池AsyncTaskExecutor,分别是默认线程池和RPC调用线程池,并将它们加载到地图中。调用者可以使用fetchAsyncTaskExecutor方法并传入线程池的名称来指定线程池执行。这里还有一个细节。Rpc线程池的线程数明显多于其他线程池,因为Rpc调用不是CPU密集型逻辑,往往伴随着大量的等待。因此,增加线程数可以有效提高并发效率。@ComponentpublicclassTracedExecutorService{@ResourceprivateThreadPoolExecutorFactorythreadPoolExecutorFactory;/***指定线程池提交异步任务并获取任务上下文*@paramexecutorName线程池名称*@paramtracedCallable异步任务*@param返回类型*@return线程上下文*/publicFuturesubmit(StringexecutorName,CallabletracedCallable){返回threadPoolExecutorFactory.fetchAsyncTaskExecutor(executorName).submit(tracedCallable);}}submit方法封装了获取线程池和提交异步任务的逻辑。这里使用Callable+Future的组合来获取异步线程的执行结果。线程池准备好了,接下来我们需要声明一个接口用于提交并发调用服务:publicinterfaceBatchOperateService{/***并发批量操作*@param函数执行逻辑*@paramrequestsrequest*@paramconfig配置*@return所有响应*/ListbatchOperate(Functionfunction,Listrequests,BatchOperateConfigconfig);}@DatapublicclassBatchOperateConfig{/***Timeout*/privateLongtimeout;/***超时单位*/privateTimeUnittimeoutUnit;/***是否需要全部执行成功*/privateBooleanneedAllSuccess;}batchOperate方法中传入函数对象,是需要并发执行的代码逻辑。requests就是请求,并发调用会递归这些请求,提交给异步线程。config对象可以对本次并发调用做一些配置,比如并发查询的超时时间,如果某些调用出现异常,整个批量查询是否继续执行。接下来看一下实现类:@Service@Slf4jpublicclassBatchOperateServiceImplimplementsBatchOperateService{@ResourceprivateTracedExecutorServicetracedExecutorService;@OverridepublicListbatchOperate(Functionfunction,Listrequests,BatchOperateConfigconfig){log.info("batchOperate启动函数:{}请求:{}配置:{}",函数,JSON.toJSONString(请求),JSON.toJSONString(配置));//当前时间longstartTime=System.currentTimeMillis();//初始化intnumberOfRequests=CollectionUtils.size(requests);//所有异步线程执行结果List>futures=Lists.newArrayListWithExpectedSize(numberOfRequests);//使用countDownLatch进行并发调用管理CountDownLatchcountDownLatch=newCountDownLatch(numberOfRequests);List>callables=Lists.newArrayListWithExpectedSize(numberOfRequests);//提交异步线程执行for(Trequest:requests){BatchOperateCallablebatchOperateCallable=newBatchOperateCallable<>(countDownLatch,function,request);callables.add(batchOperateCallable);//提交异步线程执行Futurefuture=tracedExecutorService.submit(ThreadPoolName.RPC_EXECUTOR,batchOperateCallable);futures.add(未来);}try{//等待所有执行完成,如果超时,要求所有调用成功,会抛出异常booleanallFinish=countDownLatch.await(config.getTimeout(),config.getTimeoutUnit());if(!allFinish&&config.getNeedAllSuccess()){thrownewRuntimeException("batchOperatetimeoutandneedallsuccess");}//遍历执行结果,如果部分执行失败,要求所有调用都成功,则抛出异常如果(!allSuccess&&config.getNeedAllSuccess()){thrownewRuntimeException("一些batchOperate失败了,需要全部成功");}//获取所有异步调用结果并返回Listresult=Lists.newArrayList();for(Futurefuture:futures){Rr=future.get();如果(Objects.nonNull(r)){结果.add(r);}}返回结果;}catch(Exceptione){thrownewRuntimeException(e.getMessage());}finally{doubleduration=(System.currentTimeMillis()-startTime)/1000.0;log.info("batchOperate完成持续时间:{}s函数:{}请求:{}配置:{}",持续时间,函数,JSON.toJSONString(requests),JSON.toJSONString(config));}}}通常情况下,提交到线程池后,直接遍历Future等待结果就可以了,但是这里我们使用CountDownLatch来进行更统一的超时管理。你可以看一下BatchOperateCallable的实现:publicclassBatchOperateCallableimplementsCallable{privatefinalFunction函数;私有最终T请求;/***线程处理是否成功*/privatebooleansuccess;publicBatchOperateCallable(CountDownLatchcountDownLatch,Functionfunction,Trequest){this.countDownLatch=countDownLatch;this.function=函数;this.request=请求;}@OverridepublicRcall(){try{success=false;Rresult=function.apply(request);成功=真;返回结果;}最后{countDownLatch.countDown();}}publicbooleanisSuccess(){返回成功;}}不管调用时成功还是异常,调用完我们都会将计数器减一。当计数器递减为0时,意味着所有并发调用都已执行。否则,如果计数器在规定的时间内没有归零,则说明并发调用超时,此时会抛出异常。潜在问题并发调用的一个问题是我们将下游接口的流量放大了,极端情况下甚至放大了数百倍。如果下游服务不采取限流等防御措施,我们极有可能把下游服务挂掉(这种原因导致的故障并不少见)。因此,需要对整个并发调用的流程进行控制。有两种流量控制方法。一是如果微服务采用mesh模式,可以在sidecar配置RPC调用的QPS,实现对下游服务访问的全局控制(这里选择单机限流还是集群限流,流量取决于Sidecar是否支持模式和服务的流量,一般来说,如果平均流量较小,建议选择单机限流,因为集群限流的波动往往比单机限流,流量过小造成误判)。如果没有开启mesh,需要自己在代码中实现限流器。这里推荐使用Guava的RateLimiter类,但是只支持单机限流。如果要实现集群限流,方案的复杂度会进一步增加。总结会是我们每个开发者对项目开发中遇到的场景进行抽象并尽可能给出通用解决方案的重要途径,也是提高代码复用性和稳定性的利器。并发Rpc调用是一种常见的解决方案。希望本文中的实现能对您有所帮助。