使用ObserveOn时如何处理OnNext异常?当我使用ObserveOn(Scheduler.ThreadPool)时,当观察者在OnNext上抛出错误时我的应用程序终止。我发现处理这个问题的唯一方法是使用下面的自定义扩展方法(除了确保OnNext永远不会抛出异常)。然后确保每个ObserveOn后跟一个ExceptionToError。publicstaticIObservableExceptionToError(这个IObservable源){varsub=newSubject();source.Subscribe(i=>{try{sub.OnNext(i);}catch(Exceptionerr){sub.OnError(err);}},e=>sub.OnError(e),()=>sub.OnCompleted());返回子;然而,这感觉不对。有没有更好的方法来处理这个问题?示例该程序由于未捕获的异常而崩溃。类程序{staticvoidMain(string[]args){try{varxs=newSubject();xs。ObserveOn(调度程序。线程池)。Subscribe(x=>{Console.WriteLine(x);if(x%5==0){thrownewSystem.Exception("Bang!");}},ex=>Console.WriteLine("Caught:"+ex.Message));//<-未到达xs.OnNext(1);xs.OnNext(2);xs.OnNext(3);xs.OnNext(4);xs.OnNext(5);}catch(Exceptione){Console.WriteLine("Caught:"+e.Message);//<-也没有达到}finally{Console.ReadKey();我们正在从Rxv2.0开始的RC版本中解决这个问题。您可以在我们的博客http://blogs.msdn.com/rxteam上阅读所有相关内容。它基本上归结为对管道本身进行更严格的错误处理,结合SubscribeSafe扩展方法(在订阅期间将错误重定向到OnError通道)和IScheduler上的Catch扩展方法(用于包含调度程序操作的调度程序)。关于这里提出的ExceptionToError方法,它有一个缺陷。IDisposable订阅对象在回调运行时仍然可以为null;有一个基本的竞争条件。要解决此问题,您必须使用SingleAssignmentDisposable。订阅错误和可观察错误之间存在差异。快速测试:varxs=newSubject();xs.Subscribe(x=>{Console.WriteLine(x);if(x%3==0)thrownewSystem.Exception("订阅错误");},ex=>Console.WriteLine("源错误:"+ex.Message));运行这个,你会在源代码中得到一个很好的句柄错误:xs.OnNext(1);xs.OnNext(2);xs.OnError(newException("来自源"));使用此方法运行时,您将在订阅中收到未处理的错误:xs.OnNext(1);xs.OnNext(2);xs.OnNext(3);您的解决方案所做的是在订阅中记录错误并在源代码中将其排除。而且你是在原始流上完成的,而不是在每个订阅的基础上。你可能会也可能不会这样做,但它几乎肯定是错误的。“正确”的方法是将您需要的错误处理直接添加到订阅操作中,这是它所属的位置。如果不想直接修改订阅函数,可以使用一个小帮手:publicstaticActionActionAndCatch(Actionaction,ActioncatchAction){returnitem=>{try{action(item);}catch(System.Exceptione){catchAction(e);}};现在使用它,再次显示不同错误之间的区别:xs.Subscribe(ActionAndCatch(x=>{Console.WriteLine(x);if(x%3==0)thrownewSystem.Exception("Errorinsubscription");},ex=>Console.WriteLine("Caughterrorinsubscription:"+ex.Message)),ex=>Console.WriteLine("Errorinsource:"+ex.Message));现在我们可以(分别)处理源中的错误和订阅中的错误。当然,这些动作中的任何一个都可以在方法中定义,使上面的代码(可能)像这样简单:xs.Subscribe(ActionAndCatch(Handler,ExceptionHandler),SourceExceptionHandler);本身,并且您不希望该流上有其他订阅者。这是一个完全不同类型的问题。我倾向于编写一个可观察的验证扩展来处理这种情况:)o.OnNext(x);elseo.OnError(newException("Couldnotvalidate:"+x));},e=>o.OnError(e),()=>o.OnCompleted());});}然后简单好用不混淆比喻(只有源码有bug):xs.Validate(x=>x!=3).Subscribe(x=>Console.WriteLine(x),ex=>Console.WriteLine("来源错误:"+ex.Message));如果您仍想在订阅时抑制异常,您应该使用其他讨论的方法之一。您当前的解决方案并不理想。正如其中一位Rx人员所说:Rx运算符不会捕获调用OnNext、OnError或OnCompleted时发生的异常。这是因为我们期望(1)观察者实现者最了解如何处理这些异常,而我们无法对它们做任何合理的事情。(2)如果发生异常,那么我们希望它冒泡,不被Rx处理。您当前让IObservable处理IObserver抛出的错误的解决方案在语义上没有意义,因为IObservable不应该知道任何关于观察它的事情。考虑以下示例:varerrorFreeSource=newSubject();varsourceWithExceptionToError=errorFreeSource.ExceptionToError();varobserverThatThrows=Observer.Create(x=>{if(x%5==0)thrownewException();},ex=>Console.WriteLine("There'sanargumentthatthisshouldbecalled"),()=>Console.WriteLine("OnCompleted"));varobserverThatWorks=Observer.Create(x=>Console.WriteLine("一切正常"),ex=>Console.WriteLine("但绝对不是这个"),()=>Console.WriteLine("OnCompleted"));sourceWithExceptionToError.Subscribe(observerThatThrows);sourceWithExceptionToError.Subscribe(observerThatWorks);errorFreeSource.OnNext(1);errorFreeSource.OnNext(2);errorFreeSource.OnNext(3);errorFreeSource.OnNext(4);errorFreeSource.OnNext(5);控制台.ReadLine();这里没有源或观察者问题,但由于与其他不相关错误的观察者不同,其OnError将被调用。为了防止不同线程中的异常结束进程,您必须在该线程中捕获它们,因此在观察者中放置一个try/catch块。你是对的-它应该感觉很糟糕。使用和返回这样的主题不是一个好方法。至少你应该像这样实现这个方法:.OnNext(x);}catch(Exceptionex){o.OnError(ex);subscription.Dispose();}},e=>o.OnError(e),()=>o.OnCompleted());返回订阅;});请注意,主题未被使用,如果我发现错误,我会处理订阅以防止序列继续失败。但为什么不向订阅添加一个OnError处理程序。像这样:varxs=newSubject();xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x=>{Console.WriteLine(x);if(x%5==0){thrownewSystem.Exception("Bang!");}},前=>Console.WriteLine(ex.Message));xs.OnNext(1);xs.OnNext(2);xs.OnNext(3);xs.OnNext(4);xs.OnNext(5);此代码正确捕获了订阅中的错误。另一种方法是使用Materialize扩展方法,但除非上述解决方案不起作用,否则这可能有点矫枉过正。以上是C#学习教程:使用ObserveOn时如何处理OnNext中的异常?如果所有分享的内容对你有用,需要进一步了解C#学习教程,希望大家多多关注。本文收集自网络,不代表立场。如涉及侵权,请点击右侧联系管理员删除。如需转载请注明出处:
