本文转载自微信公众号“老王加”,老王加作者老王。转载本文请联系老王Plus公众号。今天我将通过一个简单的例子来谈谈异步多路径终止。我会尽量使其易于理解,但今天的内容需要一些编程技巧。今天的话题来自最近对gRPC的一些技术研究。该主题本身与gRPC关系不大。在应用中,我使用了全双工数据管道这样一个相对复杂的概念。我们知道全双工连接是两个节点之间的连接,但它并不是简单的“请求-响应”连接。任何节点都可以随时发送消息。从概念上讲,客户端和服务器还是有区别的,但这只是概念上的,只是为了区分谁在监听连接尝试,谁在建立连接。事实上,制作双工API比制作“请求-响应”API复杂得多。由此延伸出另一个思路:做一个类库,在类库内部构建双工流水线,只暴露简单的内容和熟悉的方法提供给消费者。1.首先假设我们有这样一个API:客户端建立连接,客户端向服务端发送SendAsync消息,同时有一个TryReceiveAsync消息,试图等待服务端的消息(服务端有消息发送到True,返回False)服务器控制数据流终止,如果服务器发送完最后一条消息,客户端不再发送消息。接口代码可以这样写:interfaceITransport:IAsyncDisposable{ValueTaskSendAsync(TRequestrequest,CancellationTokencancellationToken);ValueTask<(boolSuccess,TResponseMessage)>TryReceiveAsync(CancellationTokencancellationToken);}忽略连接部分,代码看起来并不复杂.下面,我们创建两个循环并通过枚举器公开数据:ITransporttransport;publicasyncIAsyncEnumerableReceiveAsync([EnumeratorCancellation]CancellationTokencancellationToken){while(true){var(success,message)=awaittransport。TryReceiveAsync(cancellationToken);if(!success)break;yieldreturnmessage;}}publicasyncValueTaskSendAsync(IAsyncEnumerabledata,CancellationTokencancellationToken){awaitforeach(varmessageindata.WithCancellation(cancellationToken)){awaitcancport.Sendage}this}使用了异步迭代器相关的概念.不明白的可以看我的另一篇专门讲异步迭代器的文章【传送门】。2.解决终止标志貌似搞定了。我们使用循环来接收和发送,并将外部终止标志传递给这两个方法。真的完成了吗?还没有。问题出在终止标志上。我们没有考虑到这两个流是相互依赖的,特别是我们不希望生产者(使用SendAsync的代码)在任何连接失败的情况下仍然运行。事实上,会有比我们想象的更多的终止路径:我们可能已经为这两个方法提供了一个外部终止令牌,而这个令牌可能已经由触发ReceiveAsync的消费者通过WithCancellation提供了一个终止令牌给了GetAsyncEnumerator,并且此令牌可能已被触发我们的发送/接收代码可能是错误的。ReceiveAsync消费者正在获取数据,它即将终止获取——一个简单的原因是在处理接收到的数据时出了点问题。SendAsync中的生产者出了点问题。这些只是几个可能的例子,但可能还有更多。本质上,这些都是连接的信号终止,因此我们需要以某种方式涵盖所有这些场景,允许在发送和接收路径之间传达问题。换句话说,我们需要自己的CancellationTokenSource。显然,用一个库来解决这种需求更完美。我们可以将这种复杂性放在消费者可以访问的单个API中:publicIAsyncEnumerableDuplex(IAsyncEnumerablerequest,CancellationTokencellationToken=default);这个方法:允许它传入它传入的生产者调用当外部终止令牌有异步响应返回时,我们可以这样做:awaitforeach(MyResponseiteminclient.Duplex(ProducerAsync())){//...todo}asyncIAsyncEnumerableProducerAsync([EnumeratorCancellation]CancellationTokencancellationToken=default){for(inti=0;i<100;i++){yieldreturnnewMyRequest(i);awaitTask.Delay(100,cancellationToken);}}在上面的代码中,我们的ProducerAsync没有实现太多内容,目前只是传递一个占位符。后面我们可以枚举它,枚举行为实际上调用了代码。回到复式。在该方法中,至少需要考虑两种不同的终止方法:在使用通过cancellationToken传入的外部令牌期间可能传递给GetAsyncEnumerator()的潜在令牌这里,为什么不在前面列出更多终止方法呢?这里就要考虑编译器的组合了。我们需要的不是CancellationToken,而是CancellationTokenSource。publicIAsyncEnumerable双工(IAsyncEnumerable请求,CancellationTokencellationToken=默认)=>DuplexImpl(传输,请求,取消令牌);privateasyncstaticIAsyncEnumerableDuplexImpl(ITransport传输,IAsyncEnumerable请求,CancellationToken,[EnumeratorCancellation]CancellationTokenenumeratorToken=default){usingvarallDone=CancellationTokenSource.CreateLinkedTokenSource(externalToken,enumeratorToken);//...todo}这里,DuplexImpl方法允许枚举终止,但与外部终止令牌保持分离。这样,它就不会在编译器级别合并。在内部,CreateLinkedTokenSource的行为类似于编译器。我们现在有一个CancellationTokenSource,如果需要,我们可以通过它终止循环。usingvarallDone=CancellationTokenSource.CreateLinkedTokenSource(externalToken,enumeratorToken);try{//...todo}finally{allDone.Cancel();}这样,我们可以处理消费者没有获得所有数据而我们想要触发allDone,但我们退出DuplexImpl。这时候迭代器就非常有用了,它让程序变得更简单,因为使用using,最终里面的任何内容都会定位到Dispose/DisposeAsync。下一个是生产者,它是SendAsync。它也是双工的并且对传入消息没有影响,因此您可以使用Task.Run作为单独的代码路径来开始运行并在生产者发生错误时终止发送。对于上面的todo部分,你可以添加:{allDone.Cancel();throw;}},allDone.Token);//...todo:receiveawaitsend;这里启动了生产者的并行操作SendAsync。请注意,这里我们使用令牌allDone.Token将组合的终止令牌传递给生产者。延迟await以允许在ProducerAsync方法中使用终止标记来满足复合双工操作的生命周期要求。这样接收代码就变成了:while(true){var(success,message)=awaittransport.TryReceiveAsync(allDone.Token);if(!success)break;yieldreturnmessage;}allDone.Cancel();最后,把这个部分代码合在一起看:privateasyncstaticIAsyncEnumerableDuplexImpl(ITransporttransport,IAsyncEnumerablerequest,CancellationTokenexternalToken,[EnumeratorCancellation]CancellationTokenenumeratorToken=default){usingvarallDone.CancellationTokenSource,(CreateexternalTokenTokenSource,(enumeratorToken);try{varsend=Task.Run(async()=>{try{awaitforeach(varmessageinrequest.WithCancellation(allDone.Token)){awaittransport.SendAsync(message,allDone.Token);}}catch{allDone.Cancel();抛出;}},allDone.Token);while(true){var(success,message)=awaittransport.TryReceiveAsync(allDone.Token);if(!success)break;yieldreturnmessage;}allDone.Cancel();awaitsend;}finally{allDone.Cancel();}}第三,summary相关的处理非常多。这里实现的要点是:外部令牌和枚举器令牌都贡献给allDone传输中的发送和接收代码使用allDone.Token生产者枚举使用allDone.Token无论如何退出枚举器,allDone都会被传递如果传输收到错误则终止allDone如果消费者提前终止allDone在我们从服务器收到最后一条消息时终止:默认情况下,await包含对SynchronizationContext.Current的检查。除了表示额外的上下文切换之外,对于UI应用程序,它还意味着在UI线程上运行不需要在UI线程上运行的代码。库代码通常不需要这样做。因此,在库代码中,您通常应该在所有使用await来绕过此检查的地方使用.configureawait(false)。在一般应用程序代码中,默认情况下您应该只使用await而不是ConfigureAwait,除非您知道自己在做什么。