当前位置: 首页 > 编程语言 > C#

为什么从给定订阅者抛出时从未调用OnError回调?Share

时间:2023-04-10 20:21:37 C#

为什么从给定订阅者抛出时从未调用OnError回调?请遵守以下单元测试:使用系统;使用System.Reactive.Linq;使用系统线程;使用System.Threading.Tasks;使用Microsoft.VisualStudio.TestTools.UnitTesting;namespaceUnitTests{[TestClass]publicclassTestRx{publicconstintUNIT_TEST_TIMEOUT=5000;privatestaticIObservableGetObservable(intcount=100,intmsWait=10){返回Observable。Create(async(obs,cancellationToken)=>{for(inti=0;i{Thread.Sleep(msWait);返回值;}));}});}[TestMethod,TestCategory("CI"),Timeout(UNIT_TEST_TIMEOUT)]publicvoidSubscribe(){vartcs=newTaskCompletionSource();诠释我=0;获取可观察对象()。Subscribe(n=>{Assert.AreEqual(i,n);++i;},e=>Assert.Fail(),()=>{Assert.AreEqual(100,i);tcs.TrySetResult(null);});tcs.Task.Wait();}[TestMethod,TestCategory("CI"),Timeout(UNIT_TEST_TIMEOUT)]publicvoidSubscribeCancel(){vartcs=newTaskCompletionSource();varcts=newCancellationTokenSource();诠释我=0;GetObservable().Subscribe(n=>{Assert.AreEqual(i,n);++i;if(i==5){cts.Cancel();}},e=>{Assert.IsTrue(i{Assert.IsTrue(i<100);tcs.TrySetResult(null);},cts.Token);tcs.Task.Wait();}[TestMethod,TestCategory("CI"),Timeout(UNIT_TEST_TIMEOUT)]公共无效订阅抛出(){vartcs=newTaskCompletionSource();inti=0;GetObservable().Subscribe(n=>{Assert.AreEqual(i,n);++i;if(i==5){thrownewException("xo-xo");}},e=>{Assert.AreEqual("xo-xo",e.Message);tcs.TrySetResult(null);},Assert.Fail);tcs.Task.Wait();}}}单元测试SubscribeCancel和SubscribeThrow超时,因为从未调用OnError回调,所以等待任务永远不会结束怎么了?PS这个问题与如何正确包装SqlDataReader和IObservable有关?编辑同时我创建了一个新的Rx问题-https://rx.codeplex.com/workitem/74也http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/为什么从给定的订户抛出时从未调用过错误回调?Forum=RXEDIT2以下观察器实现产生完全相同的结果,即使它符合Rx设计指南的第6.5段-“订阅实现不应抛出”:privatestaticIObservableGetObservable(intcount=100,intmsWait=10){returnObservable.Create(async(obs,cancellationToken)=>{try{for(inti=0;i{Thread.Sleep(msWait);返回值;}));}obs.OnCompleted();}catch(Exceptionexc){obs.OnError(exc);}});EDIT3我开始相信,当一个异步可观察序列被集成到另一个同步代码中时,应该编写这样的代码(在一个地方或另一个地方这通常是服务器端的情况):vartcs=newTaskCompletionSource();GetObservable().Subscribe(n=>{try{...}catch(Exceptione){DoErrorLogic();tcs.TrySetException(e);}},e=>{DoErrorLogic();tcs.TrySetException(e);},()=>{DoCompletedLogic();tcs.TrySetRe结果(空);});tcs.Task.Wait();真的是这样吗?编辑4我认为它终于开始渗透到我想说的内容我现在将切换到我的另一篇文章-如何使用IObservable正确包装SqlDataReader?此行为是设计使然。如果订阅者抛出异常(顺便说一句,这是不好的做法),Rx框架会正确地说它已经死了,不再有通信。如果订阅被取消也不是错误——它只是要求不发送任何其他类型的事件——Rx尊重这一点。编辑以回应评论我认为文档中没有简单的参考点-您看到的行为是如此内在,这是隐含的。我能得到的最接近的是指向AnonymousSafeObserver和AutoDetatchObserver的源代码。后者有一个可能有帮助的解释方案,但它有点复杂。也许类比会有所帮助。假设一个数据流事件是由报刊亭出版的报纸。订户是家庭。订户抛出异常报社经理愉快地送报,直到有一天,其中一名订户-琼斯先生-留下了他的汽油,他的房子爆炸杀死了琼斯先生并摧毁了房子(抛出未处理的异常)。报刊亭意识到他不能再向琼斯先生发送报纸,也不能发送终止通知,报纸供应没有问题(因此OnError或OnCompleted不合适)并且报刊亭继续减少数量订户。相比之下,报纸印刷商无意中使用了易燃墨水,导致工厂失火。现在,可怜的报刊经销商必须向所有已无限期停止供应的订户发送解释性说明(OnError)。订阅者取消订阅琼斯先生正在接收他的报纸订阅,直到有一天他决定他厌倦了无数令人沮丧的故事并要求取消他的订阅。新闻记者有责任。他没有给琼斯先生发一张纸条,说明报纸已经停止印刷版(没有OnCompleted)——他们没有。他也没有给琼斯先生发一张说明报纸停业的便条(没有OnError)——他只是按照琼斯先生的要求停止送报。对Edit3的回应我同情你的挣扎。我注意到在您的整个代码中,您一直在尝试将TPL(任务)习惯用法与Rx结合起来。这种尝试常常让人觉得笨拙,因为它们真的是完全不同的世界。很难对这样的段落发表评论:我开始相信,当异步可观察序列集成到同步代码中时,应该编写这样的代码(通常在服务器端的某个地方或另一个):强烈同意布兰登精心设计的断言,我无法想象以您尝试的方式在服务器端将异步代码与同步代码集成的实例。这对我来说就像一种设计气味。用外行的话来说,人们会尝试让代码保持原生反应——进行订阅,然后让订阅者处理原生反应工作。我不记得必须以您描述的方式过渡到同步代码。当然,查看您在Edit3中编写的代码,并不清楚您要实现的目标。来源不负责对订阅者的错误做出反应。这是摇尾巴。需要存在以确保订阅者服务连续性的异常处理程序应该在订阅处理代码中,而不是在源可观察的代码中——它应该只关注防止流氓观察者行为。这个逻辑在上面链接的AnonymousSafeObserver中实现,并被Rx提供的大多数操作符使用。Observables可能具有处理源数据连续性的逻辑——但这与您在代码中解决的问题不同。无论您的ToTask是试图通过调用ToTask还是Wait来桥接同步代码,您都可能需要仔细考虑您的设计。我觉得提供一个更具体的问题陈述——也许来自你试图解决的现实世界场景——将有助于为你找到更有用的建议。你说的'SqlDataReader'例子......最后人们可以通过订阅直接使用可观察的[包装SqlDataReader],但他们必须等待它在某个时刻结束(阻塞线程),因为大部分代码仍然是同步的。...突出显示您所处的设计泥潭。在这种情况下,您可以推断这样的消费者显然最好使用IEnumerable接口-或者可能需要IObservable>。但关键是要着眼大局,您试图将SqlDataReader包装在可观察的包装器中这一事实是一种设计味道-因为这是响应特定一次性请求的固定数据供应。这可能是一个异步场景——但不是真正的反应式场景。将其与更常见的反应场景进行对比,例如“一收到价格就将价格发送给股票X”,在这种情况下,您完全按照源需要的方式设置未来的数据流,然后用户做出反应。它没有在指南中明确说明,但Rx语法和IObservables的目的暗示了它。IObservables将信息从源传递给一个或多个观察者。传递的信息是数据(通过OnNext),可以选择后跟OnCompleted或OnError。请务必记住,这些回调是由源触发的。它们不能也不应该由观察者触发。如果调用OnError,那将是因为源可观察链中的某些东西失败了。永远不会因为观察者而失败。在您的SubscribeThrow示例中,Observer(由您为OnNext、OnError、OnCompleted提供的3个lambda构造)失败。观察者中的这些错误不能也不应该导致源可观察本身失败。RX2.0引入了保障措施来确保这个合约。阅读RX2.0发布博文的“修订后的错误处理策略”部分。相关问题:使用ObserveOn时如何处理OnNext中的异常?EDIT3这当然是一种方法,但它非常难看。首先,我将质疑您关于异步服务器端代码通常最终需要与一些同步代码进行同步交互的断言。我发现这仅在单元测试中是正确的。但无论如何,我相信你只是订阅得太早了。我对Rx的经验是,每当我遇到摩擦时,那是因为我订阅太快以至于应该扩展可观察的monad链。在您的示例中,不是订阅数据流并在观察者中处理它,而是将您的处理器逻辑视为传入数据的另一个投影。在这种情况下,您的逻辑只是将一段数据转换为工作结果。这使您可以将逻辑的成功或失败视为流程的一部分,然后您可以边看边看。你最终得到这个:vardata=GetObservable();varresults=data.Select(item=>{DoWork(item);//因为你的工作没有产生任何东西......//它要么成功要么抛出异常//并且你不能使Observable//返回Unit。Default.Unit相当于Rx//void.returnUnit.Default;});//订阅流并同步等待它完成结果。等待();//这将在DoWork第一次失败时抛出异常//或异步等待流完成...就像任务等待结果一样;//或将流转换为在处理完成时完成的任务。vartask=results.ToTask();或者,如果您不想在遇到第一个错误时停止处理,而只想累积错误怎么办?现在你可以把你的工作想象成一个投影......).Where(e=>e!=null).ToList();varerrorList=results.Wait();//或varerrorList=awaitresults;//或Task>errorListTask=re结果.ToTask();这两个示例看起来更简单、更清晰,并且允许以不同的方式思考问题以上是C#学习教程:为什么从给定的订阅者抛出时从未调用OnError回调?如果所有分享的内容对你有用,需要进一步了解C#学习教程,希望大家多多关注。本文收集自网络,不代表立场。如涉及侵权,请点击右侧联系管理员删除。如需转载请注明出处: