当前位置: 首页 > 后端技术 > Java

京东:说说CompletableFuture的实现原理和使用场景?我傻眼了,.

时间:2023-04-01 18:54:41 Java

1。概述CompletableFuture是jdk1.8引入的一个实现类。扩展Future和CompletionStage,是一个在任务完成阶段可以触发一些操作的Future。简单的说,可以实现异步回调。2、为什么要引入CompletableFuture?对于jdk1.5的Future,虽然提供了异步处理任务的能力,但是获取结果的方式并不优雅,仍然需要阻塞(或者轮训)。如何避免阻塞?其实就是注册一个回调。业界结合观察者模式来实现异步回调。即当任务执行完成时,通知观察者。比如Netty的ChannelFuture可以通过注册监听器来实现对异步结果的处理。Netty的ChannelFuturepublicPromiseaddListener(GenericFutureListener>listener){checkNotNull(listener,"listener");同步(这){addListener0(侦听器);}if(isDone()){notifyListeners();}返回这个;}privatebooleansetValue0(ObjectobjResult){if(RESULT_UPDATER.compareAndSet(this,null,objResult)||RESULT_UPDATER.compareAndSet(this,UNCANCELLABLE,objResult)){if(checkNotifyWaiters()){notifyListeners();}返回真;}返回假;通过addListener方法注册监听器。如果任务完成,会调用notifyListeners进行通知。CompletableFuture扩展Future,引入函数式编程,通过回调处理结果。3、功能CompletableFuture的功能主要体现在他的CompletionStage上。可以实现以下功能:转换(thenCompose)、组合(thenCombine)、消费(thenAccept)、运行(thenRun)。消费和有return(thenApply)操作的区别:消费使用的是执行结果。运行只是运行特定的任务。具体的其他功能,大家可以根据自己的需要自行查看。CompletableFuture可以借助CompletionStage方法实现链式调用。并且您可以选择同步或异步两种方式。下面通过一个简单的例子来体验一下他的功能。publicstaticvoidthenApply(){ExecutorServiceexecutorService=Executors.newFixedThreadPool(2);CompletableFuturecf=CompletableFuture.supplyAsync(()->{try{//Thread.sleep(2000);}catch(Exceptione){e.printStackTrace();}System.out.println("supplyAsync"+Thread.currentThread().getName());return"hello";},executorService).thenApplyAsync(s->{System.out.println(s+"world");return"hhh";},executorService);cf.thenRunAsync(()->{System.out.println("dddddd");});cf.thenRun(()->{System.out.println("ddddsd");});cf.thenRun(()->{System.out.println(Thread.currentThread());System.out.println("dddaewdd");});}执行结果supplyAsyncpool-1-thread-1helloworldddddddddsdThread[main,5,main]dddaewdd根据结果可以看出相应的任务会有序的执行。注意:如果cf.thenRun是同步执行的。它的执行线程可能是主线程,也可能是执行源任务的线程。如果执行源任务的线程在调用main之前完成任务的执行。然后cf.thenRun方法将被主线程调用。这里解释一下,如果同一个任务有多个依赖任务:如果这些依赖任务是同步执行的。那么这些任务如果是当前调用线程(main)执行的,则按顺序执行,如果是执行源任务的线程执行的,则倒序执行。因为内部任务数据结构是后进先出。如果这些依赖任务是异步执行的,那么他会通过异步线程池来执行任务。无法保证执行任务的顺序。以上结论是通过阅读源码得到的。让我们深入研究下面的源代码。4.源码跟踪创建CompletableFuture的方式有很多种,甚至可以直接新建一个。下面看一下supplyAsync异步创建的方法。publicstaticCompletableFuturesupplyAsync(Suppliersupplier,Executorexecutor){returnasyncSupplyStage(screenExecutor(executor),supplier);}staticExecutorscreenExecutor(Executore){if(!useCommonPool&&e==ForkJoin.commonPool())returnasyncPool;如果(e==null)抛出新的NullPointerException();返回e;}输入参数Supplier,有返回值的函数。如果是异步方法,传递了一个executor,就会使用传入的executor来执行任务。否则,使用公共ForkJoin并行线程池。如果不支持并行,则创建一个新线程执行。这里需要注意的是,ForkJoin是通过守护线程来执行任务的。所以必须有一个非守护线程。asyncSupplyStage方法staticCompletableFutureasyncSupplyStage(Executore,Supplierf){if(f==null)thrownewNullPointerException();CompletableFutured=newCompletableFuture();e.execute(newAsyncSupply(d,f));返回d;这里将创建一个CompletableFuture用于返回。然后构造一个AsyncSupply,将创建的CompletableFuture作为构造参数传入。那么,任务的执行就完全依赖于AsyncSupply。AsyncSupply#runpublicvoidrun(){CompletableFutured;供应商f;如果((d=dep)!=null&&(f=fn)!=null){dep=null;fn=空;如果(d.result==null){尝试{d.completeValue(f.get());}catch(Throwableex){d.completeThrowable(ex);}}d.postComplete();}}此方法将调用Supplier的get方法。并将结果设置到CompletableFuture中。我们应该清楚,这些操作都是在异步线程中调用的。d.postComplete方法是通知任务执行完成。触发后续依赖任务的执行,这是实现CompletionStage的关键点。在看postComplete方法之前,我们先看一下创建依赖任务的逻辑。thenAcceptAsync方法publicCompletableFuturethenAcceptAsync(Consumeraction){returnuniAcceptStage(asyncPool,action);}privateCompletableFutureuniAcceptStage(Executore,Consumerf){if(f==null)thrownewNullPointerException();CompletableFutured=newCompletableFuture();if(e!=null||!d.uniAccept(this,f,null)){#1UniAcceptc=newUniAccept(e,d,this,f);推(c);C。试试火(同步);}返回d;}上文提到的。thenAcceptAsync用于消费CompletableFuture。此方法调用uniAcceptStage。uniAcceptStage逻辑:构造一个CompletableFuture,主要用于链式调用。如果是异步任务,直接返回。因为源任务结束后,会触发异步线程执行相应的逻辑。如果是同步任务(e==null),就会调用d.uniAccept方法。此方法在这里是合乎逻辑的:如果源任务已完成,则调用f并返回true。否则,输入if代码块(标记1)。如果是异步任务,直接跳if(标记1)。Mark1逻辑:构造一个UniAccept入栈。这里通过CAS实现了乐观锁。打电话给c。tryFire方法。finalCompletableFuturetryFire(intmode){CompletableFutured;CompletableFuture一个;如果((d=dep)==null||!d.uniAccept(a=src,fn,mode>0?null:this))返回null;部门=空;源代码=空;fn=空;返回d.postFire(a,模式);}将调用d.uniAccept方法。其实这个方法就是判断源任务是否完成,如果完成则执行依赖任务,否则返回false。如果依赖任务已经执行,调用d.postFire主要是Fire的后续处理。根据不同的模式,逻辑是不同的。这里简单说一下,其实模式有同步异步,也有迭代。迭代以避免无限递归。这里着重强调d.uniAccept方法的第三个参数。如果是异步调用(mode>0),则传入null。否则通过这个。区别见下面的代码。如果c不为空,将调用c.claim方法。try{if(c!=null&&!c.claim())返回false;@SuppressWarnings("未检查")Ss=(S)r;f.接受;完成空();}catch(Throwableex){completeThrowable(ex);}finalbooleanclaim(){Executore=executor;if(compareAndSetForkJoinTaskTag((short)0,(short)1)){if(e==null)返回真;执行者=空;//禁用e.execute(this);}返回假;声明方法是逻辑:如果是异步线程则为null。表示同步,则直接返回true。最后上层函数会调用f.accept(s)同步执行任务。如果异步线程不为空??,则使用异步线程执行此操作。本次运行任务如下。即在异步线程上同步调用tryFire方法。以达到其被异步线程执行的目的。publicfinalvoidrun(){tryFire(ASYNC);}看了上面的逻辑,我们基本明白了依赖任务的逻辑。其实就是先判断源任务是否完成。如果完成,则直接在对应的线程中执行之前的任务(如果是同步的,则在当前线程中处理,否则在异步线程中处理)。使用postComplete触发和调用依赖任务。postCompletemethodfinalvoidpostComplete(){/**在每个步骤中,变量f保存当前依赖项以弹出*并运行。它一次只沿着一条路径扩展,*推动其他路径以避免无限递归。*/CompletableFuturef=这个;完成时间;while((h=f.stack)!=null||(f!=this&&(h=(f=this).stack)!=null)){CompletableFutured;完成吨;if(f.casStack(h,t=h.next)){if(t!=null){if(f!=this){pushStack(h);继续;}h.next=null;//分离}f=(d=h.tryFire(NESTED))==null?这个:d;}}}将在源任务完成后调用。其实逻辑很简单,就是迭代栈的依赖任务。调用h.tryFire方法。NESTED是为了避免递归无限循环。因为FirePost会调用postComplete。如果嵌套则不调用。栈的内容实际上是在创建依赖任务时添加的。我们上面已经提到了。4.小结上面源码的逻辑已经分析完毕。因为涉及到异步操作,所以需要我们来处理(这里针对所有的异步任务):CompletableFuture创建成功后,会通过一个异步线程来执行相应的任务。如果CompletableFuture仍然有依赖任务(异步),任务将被添加到CompletableFuture的堆栈并保存。用于完成后后续执行的依赖任务。当然,创建依赖任务并不仅仅是将其添加到堆栈中。如果创建依赖任务时源任务已经执行,则当前线程会触发依赖任务的异步线程直接处理依赖任务。并且它会告诉堆栈其他依赖的任务源任务已经完成。主要考虑代码的复用。所以逻辑比较难理解。postComplete方法将在源任务线程执行完源任务后被调用。它也可以在依赖任务线程之后调用。执行依赖任务的方法主要依赖于tryFire方法。因为这个方法可能会被很多不同类型的线程触发,所以逻辑上也有点绕。(其他依赖任务线程、源任务线程、当前依赖任务线程)如果是当前依赖任务线程,则执行依赖任务,并通知其他依赖任务。如果是源任务线程和其他依赖任务线程,则将任务转移到依赖线程执行。无需通知其他依赖任务,避免死递归。不得不说,DougLea的编码真的很艺术。代码的可重用性现在是合乎逻辑的。来源:blog.csdn.net/weixin_39332800/article/details/108185931近期文章推荐:1.1000+Java面试题及答案(2022最新版)2.厉害了!Java协程来了。..3.SpringBoot2.x教程,太全面了!4.不要用爆破爆满画面,试试装饰者模式,这才是优雅的方式!!5.《Java开发手册(嵩山版)》最新发布,赶快下载吧!感觉不错,别忘了点赞+转发!