当前位置: 首页 > 科技观察

说说异步非阻塞的Java高并发系统

时间:2023-03-20 17:23:23 科技观察

在做电商系统的时候,首页、活动页、商品详情页等流量入口承载了整个网站的大部分流量网站,这些系统的主要职责包括聚合数据组装模板、热点统计、缓存、下游功能降级开关、后台数据等,其中聚合数据需要调用其他多个系统服务获取数据、组装数据/模板,然后返回到前端。聚合数据的来源主要取决于系统/服务、缓存、数据库等;而系统之间的调用可以通过http接口调用(如HttpClient)、SOA服务调用(如dubbo、thrift)等。在Java中,如果使用Tomcat,一个请求会分配一个线程来处理请求。该线程负责获取数据,组装数据或模板返回给前端;整个线程总是被占用和阻塞。如果有大量这样的请求,每个请求占用一个线程,但是线程一直处于阻塞状态,降低了系统的吞吐量,进而会导致应用程序的吞吐量下降;我们希望被依赖的服务响应比较慢,这时候应该释放线程和CPU去处理下一个请求,当被依赖的服务返回时,再分配相应的线程继续处理。对此应该有更好的解决方案:异步/协程。Java不支持协程(虽然有的Java框架说支持,但仍然是对高层API的封装),所以在Java中我们也可以使用异步来提高吞吐量。目前主要支持java的一些开源框架(HttpClient\HttpAsyncClient、dubbo、thrift等)。多个调用方法的同步阻塞调用为串行调用,响应时间为所有服务响应时间之和;半异步(asynchronousFuture)线程池,异步Future,使用场景:并发请求多个服务,总耗时为最长响应时间;提高总响应时间,但阻塞主请求线程。并发高的时候,还是会造成线程过多,CPU上下文切换;所有异步(Callback)回调方法调用,使用场景:不考虑回调时间,只能做简单的结果处理,如果依赖的服务是两个或多个服务,则不能合并两个服务的处理结果;主请求线程不阻塞,但使用场景有限。异步回调链编排异步回调链编排(JDK8CompletableFuture),使用场景:其实不是异步调用方式,只是将依赖多个服务的Callback调用结果进行整理,弥补Callback的不足,从而实现完全异步的链式传输。接下来我们看看如何使用全异步Callback调用和异步回调链编排处理结果来设计一个全异步系统设计。同步阻塞调用publicclassTest{publicstaticvoidmain(String[]args)throwsException{RpcServicerpcService=newRpcService();HttpServicehttpService=newHttpService();//耗时10msMapresult1=rpcService.getRpcResult();//耗时20msIntegerresult2=httpService.getHttpResult();//总共耗时30ms}staticclassRpcService{MapgetRpcResult()throwsException{//调用远程方法(远程方法耗时10ms左右,可以用Thread.sleep模拟)}}staticclassHttpService{IntegergetHttpResult()throwsException{//调用远程方法(远程方法耗时20ms左右,可以用Thread.sleep模拟)Thread.sleep(20);return0;}}}半异步(asynchronousFuture)publicclassTest{finalstaticExecutorServiceexecutor=Executors.newFixedThreadPool(2);publicstaticvoidmain(String[]args){RpcServicerpcService=newRpcService();HttpServicehttpService=newHttpService();Future>future1=null;Futurefuture2=null;try{future1=executor.submit(()->rpcService.getRpcResult());future2=executor.submit(()->httpService.getHttpResult());//耗时10msMapresult1=future1.get(300,TimeUnit.MILLISECONDS);//耗时20msIntegerresult2=future2.get(300,TimeUnit.MILLISECONDS);//总耗时20ms}catch(Exceptione){if(future1!=null){future1.cancel(true);}if(future2!=null){future2.cancel(true);}thrownewRuntimeException(e);}}staticclassRpcService{MapgetRpcResult()throwsException{//callremote方法(远程方法耗时约10ms,可通过Thread.sleep模拟)}}staticclassHttpService{IntegergetHttpResult()throwsException{//调用远程方法(远程方法耗时约20ms,可通过Thread模拟。睡眠)}}}完全异步(回调)publicclassAsyncTest{publicstaticHttpAsyncClienthttpAsyncClient;publicstaticCompletableFuturegetHttpData(Stringurl){CompletableFutureasyncFuture=newCompletableFuture();HttpPostpost=newHttpPost(url);HttpAsyncRequestProducerproducer=HttpAsyncMethods.create>ConsumerAsyncRequestProducer=newAsyncCharConsumer(){Http响应;protectedHttpResponsebuildResult(finalHttpContextcontext){returnresponse;}….。);returnasyncFuture;}publicstaticvoidmain(String[]args)throwsException{AsyncTest.getHttpData("http://www.jd.com");Thread.sleep(1000000);}}本示例使用HttpAsyncClient演示异步回调链式编排CompletableFuture提供了50多个API,可以满足各种场景需要的异步处理编排。下面是三种场景:场景一:三个服务并发异步调用,返回CompletableFuture,不阻塞主线程;methodtest1:publicstaticvoidtest1()throwsException{HelloClientDemoTestservice=newHelloClientDemoTest();/***场景一:同时异步??调用两个以上服务,返回CompletableFuture,不阻塞主线程*且两个服务也是异步非-阻塞调用*/CompletableFuturefuture1=service.getHttpData("http://www.jd.com");CompletableFuturefuture2=service.getHttpData("http://www.jd.com");CompletableFuturefuture3=service.getHttpData("http//www.jd.com");ListfutureList=Lists.newArrayList(future1,future2,future3);CompletableFutureallDoneFuture=CompletableFuture.allOf(futureList.toArray(newCompletableFuture[futureList.size()]));CompletableFuturefuture4=allDoneFuture.thenApply(v->{Listresult=futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());//注意命令Stringresult1=(String)result.get(0);Stringresult2=(String)result.get(1);Stringresult3=(String)result.get(2);//处理业务....returnresult1+result2+result3;}).exceptionally(e->{//e.printStackTrace();return"";});//return}场景2,两个服务并发异步调用,返回CompletableFuture,不阻塞主线程;methodtest2:publicvoidtest2()throwsException{HelloClientDemoTestservice=newHelloClientDemoTest();/***场景二,并发异步调用两个接口,返回CompletableFuture,不阻塞主线程*并且两个服务也是异步非阻塞调用*/CompletableFuturefuture1=service.getHttpData("http://www.jd.com");CompletableFuturefuture2=service.getHttpData("http://www.jd.com");CompletableFuturefuture3=future1.thenCombine(future2,(f1,f2)->{//处理业务....returnf1+","+f2;}).exceptionally(e->{return"";});//return}场景三,两个服务,调用两个服务并发异步,一个服务的结果返回后,再次调用另一个服务,然后将三个结果一起处理,返回CompletableFuture,整个处理过程不阻塞任何线程;methodtest3:publicvoidtest3()throwsException{HelloClientDemoTestservice=newHelloClientDemoTest();/***场景3两个请求依赖调用,然后结合另一个服务结果,返回CompletableFuture,不阻塞主线程*和两个服务也是异步非阻塞调用*/CompletableFuturefuture1=service.getHttpData("http://www.jd.com");CompletableFuturefuture2=service.getHttpData("http://www.jd.com");CompletableFuturefuture3=future1.thenApply((param)->{CompletableFuturefuture4=service.getHttpData("http://www.jd.com");returnfuture4;});CompletableFuturefuture5=future2.thenCombine(future3,(f2,f3)->{//....处理业务returnf2+","+f3;}).exceptionally(e->{return"";});//返回future5}全异步Web系统设计的主要技术:servlet3、JDK8CompletableFuture、支持异步Callback调用的RPC框架先看处理流程图:servlet3:Servlet收到请求后,可能首先需要对请求携带的数据进行一些预处理;然后,Servlet线程将请求转交给一个异步线程进行业务处理,而线程本身返回容器中业务处理耗时的情况,这样会大大减少对服务器资源的占用,提高处理速度f并发处理。servlet3可参考商品详情页系统的Servlet3异步化实践,结合其中讲解的servlet3整合:publicvoidsubmitFuture(finalHttpServletRequestreq,finalCallabletask)throwsException{finalStringuri=req.getRequestURI();finalMapparams=req.getParameterMap();finalAsyncContextasyncContext=req.startAsync();asyncContext.getRequest().setAttribute("uri",uri);asyncContext.getRequest().setAttribute("params",params);asyncContext.setTimeout(asyncTimeoutInSeconds*1000);if(asyncListener!=null){asyncContext.addListener(asyncListener);}CompletableFuturefuture=task.call();future.thenAccept(result->{HttpServletResponseresp=(HttpServletResponse)asyncContext.getResponse();try{if(resultinstanceofString){byte[]bytes=newbyte[0];if(StringUtils.isBlank(result)){resp.setContentType("text/html;charset=gbk");resp.setContentLength(0);}else{bytes=result.getBytes("GBK");}//resp.setBufferSize(bytes.length);resp.setContentType("text/html;charset=gbk");if(StringUtils.isNotBlank(localIp)){resp.setHeader("t.ser",localIp);}resp.setContentLength(bytes.length);resp.getOutputStream().write(bytes);}else{write(resp,JSONUtils.toJSON(result));}}catch(Throwablee){resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);//程序内部错误try{LOG.error("getinfoerror,uri:{},params:{}",uri,JSONUtils.toJSON(params),e);}catch(Exceptionex){}}finally{asyncContext.complete();}}).exceptionally(e->{asyncContext.complete();returnull;});}另外还有Java中的协程库Quasar,可以参考《Java的纤程库 - Quasar》,目前应用中没有用到,在测试FiberHttpServlet的时候遇到了很多坑,以后会用到,Quasar会自由使用,形成日记,希望结交更多的朋友一起学习,踩坑作者:孙伟,目前负责京东商品详情页统一服务系统,编写过java,ngx_lua,和风暴等,喜欢学习和研究新事物。【本文来自专栏作者张凯涛微信公众号(凯涛博客)公众号id:kaitao-1234567】点此查看作者更多好文