当前位置: 首页 > 科技观察

多线程不得不说的Future类

时间:2023-03-17 15:08:32 科技观察

转载本文请联系三太子敖丙公众号。在高性能编程中,并发编程已经成为极其重要的一环。当单核CPU的性能已经达到了极限,我们只能通过多核来进一步提升系统的性能,从而催生并发编程。由于并发编程比串行编程更难,更容易出错,所以我们需要借鉴前人一些优秀成熟的设计模式,使我们的设计更加健壮和完善。Future模式是使用最广泛、也极其重要的设计模式。今天就和阿炳一起了解未来模式吧!生活中的Future模式为了更快的理解Future模式,我们先来看一个生活中的例子。场景一:午饭时间到了,同学们要吃饭了。小王下楼,走了20分钟,来到了肯德基。点餐、排队、吃饭用了20分钟,走回公司继续上班用了20分钟,一共1个小时。场景二午饭时间到了,同学们要吃饭了。小王点了一份肯德基外卖,很快,就接到了订单(虽然订单不能当正餐吃,但是有了订单,我怕是吃不下了)。然后小王就可以继续工作了,过了30分钟,外卖到了,然后小王吃了10分钟,就可以继续工作了,还顺利和隔壁的小王扯上了关系。显然,在这两种场景下,小王的工作时间都更加紧凑,尤其是排队的时间可以由外卖员完成,可以更加专注于自己的工作。聪明的你应该已经意识到,场景一是典型的函数同步调用,而场景二是典型的异步调用。场景2的异步调用还有一个特点就是它有一个返回值,也就是我们的订单。这个顺序非常重要。有了这个顺序,我们就可以得到当前调用对应的结果。这里的顺序就像FutureinFuture模式,是契约,也是承诺。虽然订单不能吃,但是订单在手,不怕吃不下。虽然Future不是我们想要的结果,但是持有Future可以在未来得到我们想要的结果。所以Future模式很好的解决了那些需要返回值的异步调用。Future模式中的主要角色一个典型的Future模式由以下几个部分组成:Main:系统启动,调用Client发送请求Client:返回Data对象,立即返回FutureData,开启ClientThread线程组装RealDataData:返回数据接口FutureData:Futuredata,构建速度快,但属于虚拟数据,需要和RealData拼装,比如订单。RealData:真实数据,构建速度比较慢,比如上例中的肯德基午餐。它们之间的关系如下图所示:其中,值得注意的是Data、RealData和FutureData。这是一组典型的代理模式。Data接口代表外部数据,RealData代表真实数据,就像午餐一样。获得它的成本很高,需要很多时间;relativeFutureData,作为RealData的代理,类似于订单/合约,通过FutureData,可以获取未来的RealData。因此,Future模式本质上是Proxy模式的实际应用。实现一个简单的Future模式根据上面的设计,让我们来实现一个简单的代理模式!首先是Data接口,表示数据:publicinterfaceData{publicStringgetResult();}然后是FutureData,也是整个Future模式的核心:publicclassFutureDataimplementsData{//RealData需要在内部维护protectedRealDatarealdata=null;protectedbooleanisReady=false;publicsynchronizedvoidsetRealData(RealDatarealdata){if(isReady){return;}this.realdata=realdata;isReady=true;//RealData已经注入,notifygetResult()notifyAll();}//会等待RealData构造完成publicsynchronizedStringgetResult(){while(!isReady){try{//等到RealData被注入wait();}catch(InterruptedExceptione){}}//真正需要的数据从RealData中获取returnrealdata.result;}}下面是RealData:publicclassRealDataimplementsData{protectedfinalStringresult;publicRealData(Stringpara){StringBuffersb=newStringBuffer();//假设这里很慢,构造RealData不是一件容易的事result=sb.toString();}publicStringgetResult(){returnresult;}}ThengetDatafromClient:publicclassClient{//这是一个异步方法,返回的Data接口是一个FuturepublicDatarequest(finalStringqueryStr){finalFutureDatafuture=newFutureData();newThread(){publicvoidrun(){//RealData的构造很慢,所以在一个单独的线程中,RealDatarealdata=newRealData(queryStr);//setRealData()会notify()等待这个future上的对象future.setRealData(realdata);}}。start();//FutureData会立即返回,不会等待RealData构造完成returnfuture;}}最后一个Main函数串起来:publicstaticvoidmain(String[]args){Clientclient=newClient();//这将立即返回,因为获取的是FutureData而不是RealDataData=client.request("name");System.out.println("请求完成");try{//这里可以使用一个sleep代替其他业务逻辑处理//在处理这些业务逻辑的过程中,创建RealData,充分利用等待时间Thread.sleep(2000);}catch(InterruptedExceptione){}//使用真实的数据,如果这里的数据还没有准备好,getResult()会等待数据准备好,然后返回System.out.println("data="+data.getResult());}这是Future模式最简单的实现,虽然简单,但是已经包含了Future模式最本质的部分,对于大家理解JDK内部的Future对象非常重要。Java中的Future模式Future模式非常常用,以至于它已在JDK中得到完全实现和支持。接下来我们看一下JDK内部的Future实现:首先,JDK内部有一个Future接口,类似于上面说的顺序。当然,作为一个完整的商业产品,这里的Future还有更多的功能,除了获取真实数据的get()方法外,它还提供了一套辅助方法,比如:cancel():如果等待时间过长,可以直接取消任务isCancelled():任务已经取消了吗?isDone():任务完成了吗get():有2个get()方法,不带参数的是无限等待,或者你可以等待给定的时间下面的代码演示了如何使用这个Future://异步操作可以使用一个线程池ExecutorServiceexecutor=Executors.newFixedThreadPool(1);//执行FutureTask,相当于上例中的client.request("name")发送请求//这里启动一个线程执行RealData调用()执行Futurefuture=executor.submit(newRealData("name"));System.out.println("请求完成,正在准备数据");try{//可以这里还是做额外的数据操作,这里使用sleep代替其他业务逻辑ProcessingThread.sleep(2000);}catch(InterruptedExceptione){}//如果此时call()方法没有执行,还是会等待System.out.println("data="+future.get());整个使用过程很简单,我们分析一下executor.submit()中发生了什么:publicFuturesubmit(Callabletask){if(task==null)thrownewNullPointerException();//根据Callable对象,创建一个RunnableFuture,这里其实就是FutureTaskRunnableFutureftask=newTaskFor(任务);//将ftask推送到线程池中//在新的线程中,是run()方法,下面代码给出execute(ftask);//返回这个Future,以后可以通过this执行Futurereturnftask;}protectedRunnableFuturenewTaskFor(Callablecallable){returnnewFutureTask(callable);}最关键的部分在下面,当FutureTask单独作为线程执行时,结果将被保存到结果并设置任务的状态。下面是FutureTask的run()方法:从FutureTask获取结果的实现如下:publicVget()throwsInterruptedException,ExecutionException{ints=state;//如果没有完成,就等待返回使用park()方法阻塞线程//同时,所有等待的线程都会在FutureTask的waiters字段中排队if(s<=COMPLETING)s=awaitDone(false,0L);returnreport(s);}privateVreport(ints)throwsExecutionException{//Outcome存放在最后的计算结果中Objectx=outcome;if(s==NORMAL)//正常完成,返回outcomereturn(V)x;//如果没有正常完成,比如被取消用户,或者如果有异常,则抛出异常,但是也有一个问题,就是任务提交给线程后,调用线程doe不知道什么时候完成任务。如果调用get()方法或isDone()方法,可能会造成不必要的等待,系统的吞吐量难以提升。为了解决这个问题,JDK对Future模式进行了强化,创建了CompletableFuture,可以理解为Future模式的升级版。它最大的作用是提供一个回调机制,可以在任务完成后自动回调一些后续任务。这样,整个程序就可以彻底去掉“结果等待”。我们看一个简单的例子:在这个例子中,首先创建一个基于getPrice()的异步调用,然后使用thenAccept()方法设置后续操作,即在执行getPrice()时进行后续处理.不难看出CompletableFuture比普通Future更实用,因为它可以在Future执行成功后自动回调下一步,所以整个程序不会有任何阻塞的地方(就是你没有等待Future的执行,但Future执行成功后会自动告诉你)。以上述代码为例,CompletableFuture的所有神奇功能完全归功于AsyncSupply类(由上述代码中的supplyAsync()方法创建)。当执行AsyncSupply时,它看起来像这样:publicvoidrun(){CompletableFutured;Supplierf;if((d=dep)!=null&&(f=fn)!=null){dep=null;fn=null;if(d.result==null){try{//这里是你要执行的异步方法//结果会被保存并放到d.result字段d.completeValue(f.get());}catch(Throwableex){d.completeThrowable(ex);}}//执行成功,进行后续处理。在这个后续的处理中,会调用thenAccept()中的consumer//这个相当于Future完成最后的通知d.postComplete();}}继续看d.postComplete(),这里会调用一系列后续operationsfinalvoidpostComplete(){//省略部分代码,关注tryFire()//在tryFire()中,真正触发了后续的调用,也就是f=(d=h.tryFire(NESTED))==的部分null?this:d;}}}在thenAccept()中。今天主要介绍Future模式。我们从一个最简单的Future模式开始,逐步深入。介绍了JDK内部Future模式的实现,并简单介绍了Future模式的进化版本CompletableFuture。对于多线程开发,Future模式的应用极为广泛。可以说,这种模式已经成为异步开发的基础设施。