使用IObservable进行批处理我的服务器给我发送了批处理消息。消息的批次和频率是任意的。有时我每隔1分钟收到一次消息,有时我一个小时都没有收到消息。有时是1条消息,有时是10条消息。我当前的实现使用Observable.Buffer(TimeSpan.FromSeconds(5))对消息进行分组并将它们发送给用户。如果两条消息之间有几秒钟的延迟,是否可以配置Observable向用户发送缓冲消息。我能够避免每5秒不必要的计时器滴答声。对优化批处理的其他建议持开放态度。使用bufferClosingSelector工厂方法decPL建议使用接受bufferClosingSelector的Buffer重载——一个在打开新缓冲区时调用的工厂函数。它产生一个流,其第一个OnNext()或OnCompleted()信号刷新当前缓冲区。decPL代码如下所示:observable.Buffer(()=>observable.Throttle(TimeSpan.FromSeconds(5)))这是解决方案的一个很好的进展,但它有几个问题:防止无限节流对于我们需要的流使用额外的机制来限制缓冲区长度并防止无限节流。Buffer有一个允许您指定最大长度的重载,但遗憾的是您不能将它与结束选择器结合使用。让我们将所需的缓冲区长度限制称为n。回想一下,关闭选择器的第一个OnNext就足以关闭缓冲区,因此我们需要做的就是将一个计数流合并到一个计数流中,该计数流在来自源的n个事件后发送OnNext。我们可以使用.Take(n).LastAsync();接受前n个事件,但忽略除最后一个事件之外的所有事件。这是Rx中非常有用的模式。使源“热”要解决bufferClosingSelector工厂重新订阅源的问题,我们需要在源上使用.Publish().RefCount()的通用模式来为我们提供一个只向订阅者发送最新事件的流.这也是一个非常有用的模式。解决方案这是重新编写的代码,其中节流持续时间与计数器合并:varthrottleDuration=TimeSpan.FromSeconds(5);可变缓冲区大小=3;//对源的单一订阅varsourcePub=source.Publish().RefCount();varoutput=sourcePub.Buffer(()=>sourcePub.Throttle(throttleDuration).Merge(sourcePub.Take(bufferSize).LastAsync()));生产就绪代码和测试这是一个带有测试的生产就绪实现(使用nuget包rx-testing和nunit)。请注意调度程序的参数化以支持测试。publicstaticpartialclassObservableExtensions{publicstaticIObservable>BufferNearEvents(thisIObservablesource,TimeSpanmaxInterval,intmaxBufferSize,ISchedulerscheduler){if(scheduler==null)scheduler=ThreadPoolScheduler.Instance;}if(maxBufferSizepublishedSource.Throttle(maxInterval,scheduler).Merge(publishedSource.Take(maxBufferSize).LastAsync()));}}publicclassBufferNearEventsTests:ReactiveTest{[Test]publicvoidCloseEventsAreBuffered(){TimeSpanmaxInterval=TimeSpan.FromTicks(200);constintmaxBufferSize=1000;varscheduler=newTestScheduler();varsource=scheduler.CreateColdObservable(OnNext(100,1),OnNext(200,2),OnNext(300,3));IListexpectedBuffer=new[]{1,2,3};varexpectedTime=maxInterval.Ticks+300;varresults=scheduler.CreateObserver>();source.BufferNearEvents(maxInterval,maxBufferSize,scheduler).Subscribe(results);scheduler.AdvanceTo(1000);结果.Messages.AssertEqual(OnNext>(expectedTime,buffer=>CheckBuffer(expectedBuffer,buffer)));}[测试]publicvoidFarEventsAreUnbuffered(){TimeSpanmaxInterval=TimeSpan.FromTicks(200);constintmaxBufferSize=1000;varscheduler=newTestScheduler();varsource=scheduler.CreateColdObservable(OnNext(1000,1),OnNext(2000,2),OnNext(3000,3));IList[]expectedBuffers={new[]{1},new[]{2},new[]{3}};varexpectedTimes=new[]{maxInterval.Ticks+1000,maxInterval.Ticks+2000,maxInterval.Ticks+3000};varresults=scheduler.CreateObserver>();source.BufferNearEvents(maxInterval,maxBufferSize,scheduler).Subscribe(results);scheduler.AdvanceTo(10000);结果.Messages.AssertEqual(OnNext>(expectedTimes[0],buffer=>CheckBuffer(expectedBuffers[0],buffer)),OnNext>(expectedTimes[1],buffer=>CheckBuffer(expectedBuffers[1],buffer)),OnNext>(expectedTimes[2],buffer=>CheckBuffer(expectedBuffers[2],buffer)));}[测试]publicvoidUpToMaxEventsAreBuffered(){TimeSpanmaxInterval=TimeSpan.FromTicks(200);constintmaxBufferSize=2;varscheduler=newTestScheduler();varsource=scheduler.CreateColdObservable(OnNext(100,1),OnNext(200,2),OnNext(300,3));IList[]expectedBuffers={new[]{1,2},new[]{3}};varexpectedTimes=new[]{200,/*达到缓冲区上限*/maxInterval.Ticks+300};varresults=scheduler.CreateObserver>();source.BufferNearEvents(maxInterval,maxBufferSize,scheduler).Subscribe(results);scheduler.AdvanceTo(10000);结果.Messages.AssertEqual(OnNext>(expectedTimes[0],buffer=>CheckBuffer(expectedBuffers[0],buffer)),OnNext>(expectedTimes[1],buffer=>CheckBuffer(expectedBuffers[1],buffer)));}privatestaticboolCheckBuffer(IEnumerableexpected,IEnumerableactual){CollectionAssert.AreEquivalent(expected,actual);返回真;如果我正确理解你的描述,Observable.Buffer仍然是你的朋友,只需使用导致可观察事件的重载来指示何时应发送缓冲项如下:observable.Buffer(()=>observable.Throttle(TimeSpan.FromSeconds(5)))这是个老问题,不过好像和我最近的问题有关。Enigmativity找到了一个很好的方法来完成我认为你正在努力实现的目标,所以我想我会分享。我用扩展方法包装了解决方案:publicstaticclassObservableExtensions{publicstaticIObservableBatch(thisIObservableobservable,TimeSpantimespan){returnobservable.GroupByUntil(x=>1,g=>Observable.Timer(timespan)).Select(x=>x.ToArray()).Switch();可以这样使用:以上就是《C#学习教程:使用IObservable进行批处理》的全部内容分享,如果对大家有用,需要进一步了解C#学习教程,希望大家多多关注对它—observableSource.Batch(TimeSpan.FromSeconds(5));本文收集自网络,不代表立场。如涉及侵权,请点击右侧联系管理员删除。如需转载请注明出处:
