当前位置: 首页 > 编程语言 > C#

有没有Rx方法在没有传入值的情况下,周期性的重复最后一个值?分享

时间:2023-04-10 20:51:27 C#

有没有Rx方法在没有传入值的情况下,周期性的重复之前的值?我遇到过的一个用例是:IObservableObservable.RepeatLastValueDuringSilence(thisIObservableinner,TimeSpanmaxQuietPeriod);它将返回内部observable中的所有未来项目,但是,如果内部observable在OnNext(maxQuietPeriod)中有一段时间没有被调用,它只是重复最后一个值(当然内部调用OnCompleted或OnError)。理由是该服务会定期ping出定期状态更新。例如:varmyStatus=Observable.FromEvent(h=>this.StatusUpdate+=h,h=>this.StatusUpdate-=h);varmessageBusStatusPinger=myStatus.RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1)).Subscribe(update=>_messageBus.Send(update));这样的事情存在吗?还是我高估了它的用处?谢谢,AlexPS:对于任何不正确的术语/语法,我深表歉意,因为我只是第一次探索Rx。类似于Matthew的解决方案,但这里的计时器在源中接收到每个元素后启动,我认为这更正确(但区别不太重要):选择(x=>Observable.Interval(maxQuietPeriod).Select(_=>x).StartWith(x)).Switch();}并测试:varsource=Observable.Interval(TimeSpan.FromMilliseconds(100))。Take(5).Select(_=>"1").Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_=>"2")).Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_=>"3"));source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);您应该看到1次打印10次(5次来自源,5次在静音期间重复),然后很多2次来自源,另外4次来自每次之间的静音,然后无限3次。这个相当简单的查询完成了这项工作:varquery=source.Select(s=>Observable.Interval(TimeSpan.FromSeconds(1.0)).StartWith(s).Select(x=>s)).Switch();永远不要低估.Switch()的力量。我想做你想做的,如果你的观察不热,你需要发布和重新计算它:varrepeating=throttled.SelectMany(i=>Observable.Interval(maxQuietPeriod).Select(_=>i).TakeUntil(inner));返回Observable.Merge(内部,节流,重复);Rx库中没有方法,但是我也需要这样的方法。在我的用例中,即使源不输出任何值,我也需要源输出一个值。如果您不希望在第一个源值出现之前输出任何值,则可以在订阅调用之前删除defaultValue参数和对createTimer()的调用。需要调度程序来运行计时器。一个明显的过载是不需要调度程序并选择默认调度程序(我使用ThreadPool调度程序)。以上是C#学习教程:有没有Rx方法在没有传入值的情况下,周期性的重复之前的值?分享的所有内容,如果对你有用,需要了解更多C#学习教程,希望大家多多关注—ImportsSystem.ReactiveImportsSystem.Reactive.ConcurrencyImportsSystem.Reactive.DisposablesImportsSystem.Reactive...schedulerIsNothingThenThrowNewArgumentNullException("scheduler")返回Observable.Create(Function(observerAsIObserver(OfT))DimidAsULongDimgateAsNewObject()DimtimerAsNewSerialDisposable()DimlastValueAsT=defaultValueDimcreateTimerAsAction=Sub()DimstartIdAsULong=idtimer.Disposable=scheduler.Schedule(timeout,Sub(selfAsAction(OfTimeSpan))DimnoChangeAsBooleanSyncLockgatenoChange=(id=startId)如果没有变化然后observer.OnNext(lastValue)EndIfEndSyncLock'才重启t如果没有改变,否则'改变重新开始超时IfnoChangeThenself(timeout)EndSub)EndSub'开始第一次超时createTimer()'订阅源可观察的Dimsubscription=source.Subscribe(Sub(v)SyncLockgateid+=1ULlastValue=vEndSyncLockobserver.OnNext(v)createTimer()'重置超时EndSub,Sub(ex)SyncLockgateid+=1ULEndSyncLockobserver.OnError(ex)'不重置超时,因为序列已经结束EndSub,Sub()SyncLockgateid+=1ULEndSyncLockobserver.OnCompleted()'不重置超时,因为序列已经结束EndSub)ReturnNewCompositeDisposable(timer,subscription)EndFunction)EndFunction本文收集自网络,不代表立场。如涉及侵权,请点击右侧联系管理员删除。如有转载请注明出处: