1.从一个同步的Http调用到一个很简单的业务逻辑,其他后端服务都提供了接口,我们需要通过接口调用获取响应数据。逆向地理接口:获取经纬度所在省、市、市、县及对应编码:curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713"{"adcode":"510722"}服务器执行,最简单的同步调用方式:在服务器响应之前,IO会阻塞在:java.net.SocketInputStream#socketRead0的native方法上:通过jstack日志可以发现,Thread此时会一直处于可运行状态:"main"#1prio=5os_prio=31tid=0x00007fed0c810000nid=0x1003runnable[0x000070000ce14000]java.lang.Thread.State:RUNNABLEatjava.net.SocketInputStream.socketRead0(NativeMethod)atjava.net.SocketInputStream.socketRead(SocketInputStream.java:116)atjava.net.SocketInputStream.read(SocketInputStream.java:171)atjava.net.SocketInputStream.read(SocketInputStream.java:141)atorg.apache.http.impl.conn.LoggingInputStream。读取(LoggingInputStream.java:84)atorg.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)atorg.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)atorg.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)atorg.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)atorg.apache.http.impl.conn.DefaultHttpResponseParser。parseHead(DefaultHttpResponseParser.java:56)atorg.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)atorg.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)atorg.apache。http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165)atorg.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)atorg.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor:Executor)125)atorg.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)atorg.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)atorg.apache.http.impl。execchain.RetryExec.execute(RetryExec.java:89)在org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)atorg.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)atorg.apache.http.impl.client。CloseableHttpClient.execute(CloseableHttpClient.java:83)atorg.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)atcom.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207).......线程模型示例:同步最大的问题是IO等待过程中线程资源没有得到充分利用,对大量IO场景的业务吞吐量会有一定的限制。2.JDKNIO&Future在JDK1.5中,JUC提供了Future抽象:当然并不是所有的Futures都是这样实现的,比如io.netty.util.concurrent.AbstractFuture就是线程轮询的。这样做的好处是主线程不需要等待IO响应,可以做别的事情,比如再发送一个IO请求,可以一直等到它一起返回:"main"#1prio=5os_prio=31tid=0x00007fd7a500b000nid=0xe03waitingoncondition[0x000070000a95d000]java.lang.Thread.State:WAITING(parking)atsun.misc.Unsafe.park(NativeMethod)-parkingtowaitfor<0x000000076ee2d768>(ajava.util.concurrent.CountatdownjaLatch$Sync)locks.LockSupport.park(LockSupport.java:175)在java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)atjava.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java7.conutil.99)atAbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)atjava.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)atorg.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162)atcom.amap.aoAsyncIO.futureBlockingGet(AsyncIO.java:201)....."AsyncHttpClient-2-1"#11prio=5os_prio=31tid=0x00007fd7a7247800nid=0x340brunnable[0x000070000ba94000]java.lang.Thread.State:RUNNABLEatsun.nio.ch.KQueueArrayWrapper.keventArue0(Native.method.nepolWrayKatsun.nio)(KQueueArrayWrapper.java:198)atsun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)atsun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)-锁定<0x000000076eb00ef0>(aio.netty.channel.nio.SelectedSelectionKeySet)-locked<0x000000076eb00f10>(ajava.util.Collections$UnmodifiableSet)-locked<0x000000076eb00ea0>(asun.nio.ch.KQueueSelectorImpl)atsun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)atio.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693)atio.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)atio.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)atio.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)atjava.lang.Thread.run(Thread.java:748)主线程在等待结果返回时仍然需要等待,这个问题并没有得到根本解决。线程等待并得到结果,那么主线程发送完请求后是否可以不关心这个逻辑就去执行其他逻辑呢?那么就可以使用Callback机制了。这样主线程发起IO后就不再需要关心业务逻辑了。发送请求后,完全可以做其他事情,或者返回线程池进行调度。如果是HttpServer,需要结合Servlet3.1的异步Servlet。异步Servlet参考https://www.cnblogs.com/davenkin/p/async-servlet.html使用Callback方法,从线程模型上发现线程资源已经被充分利用,线程中没有线程整个过程块。四Callbackhell回调地狱,当Callback线程还需要执行下一次IO调用时,此时进入回调地狱模式。一个典型的应用场景是通过经纬度获取行政区域的adcode(反向地理界面),然后根据获取的adcode获取当地的天气信息(天气界面)。在同步编程模型中,很少涉及此类问题。Callback方法的核心缺陷五JDK1.8CompletableFuture那么有什么办法可以解决CallbackHell的问题呢?当然,CompletableFuture是在JDK1.8中提供的。我们先看看它是如何解决这个问题的。将逆向地理的Callback逻辑封装成一个独立的CompletableFuture。当异步线程回调时,调用future.complete(T)封装结果。天气执行的Call逻辑也被封装到一个独立的CompletableFuture中。完成后,逻辑同上。Compose连接,完成时输出:每个IO操作都可以封装成一个独立的CompletableFuture,从而避免回调地狱。CompletableFuture只有两个属性:result:Future的执行结果(EithertheresultorboxedAltResult)。stack:操作栈,用来定义这个Future的下一个操作的行为(TopofTreiberstackofdependentactions)。如何调用weatherFuture方法?通过栈可以发现是在reverseCodeFuture.complete(result)的时候,获取到的adcode也作为参数执行接下来的逻辑。这样就完美解决了回调地狱的问题。在主逻辑中,看起来是在同步编码。六、Vert.xFutureInfo-Service,广泛使用的Vert.xFuture也是类似的方案,只是在设计上使用了Handler的概念。core实现的逻辑类似:这当然不是Vertx的全部,当然这是题外话。七ReactiveStreams异步编程对吞吐量和资源都有好处,但是有没有一个统一的抽象来解决这样的问题,答案是ReactiveStreams。核心抽象:PublisherSubscriberProcessorSubscription,在整个包中,只有这四个接口,并没有实现类。在JDK9中,已经作为规范封装到java.util.concurrent.Flow中:参考https://www.baeldung.com/java-9-reactive-streamshttp://ypk1226.com/2019/07/01/reactive/reactive-streams/https://www.reactivemanifesto.org/https://projectreactor.io/learn一个简单的例子:EightReactor&Spring5&SpringWebFluxFlux&MonoReferenceshttps://projectreactor.io/docs/core/3.1.0.M3/reference/index.htmlhttps://speakerdeck.com/simonbasle/projectreactor-dot-io-reactor3-intro
