当前位置: 首页 > 编程语言 > C#

如何连接多个IObservable序列?分享

时间:2023-04-10 11:54:38 C#

如何连接多个IObservable序列?vara=可观察的。范围(0,10);varb=可观察的。范围(5、10);变种邮编=一个。Zip(b,(x,y)=>x+"-"+y);邮编。订阅(控制台。WriteLine);打印0-51-62-7...相反,我想加入相同的值5-56-67-78-8...这是合并100个有序的异步序列问题的简化示例.加入两个IEnumerables非常容易,但我找不到在Rx中做这样的事情的方法。有任何想法吗?更多关于输入和我想要实现的目标。基本上,整个系统是一个实时管道,具有多个状态机(聚合器、缓冲区、平滑过滤器等),通过分叉连接模式连接。RX适合实现这些东西吗?每个输入都可以表示为publicstructDataPoint{publicdoubleValue;公共DateTimeOffset时间戳;每个输入的数据位在到达时都带有时间戳,因此所有事件自然按其连接键(timestamp)排序。当事件通过管道传播时,它们会分叉和加入。连接需要通过时间戳关联并以预定义的顺序应用。例如,join(a,b,c,d)=>join(join(join(a,b),c,d))。编辑这是我能够快速想出的。希望有一个基于现有Rx操作符的更简单的解决方案。staticvoidTest(){vara=Observable.Range(0,10);varb=Observable.Range(5,10);//varzip=a.Zip(b,(x,y)=>x+"-"+y);//zip.Subscribe(Console.WriteLine);varjoined=MergeJoin(a,b,(x,y)=>x+"-"+y);joined.Subscribe(Console.WriteLine);}staticIObservableMergeJoin(IObservableleft,IObservableright,Funcselector){returnObservable.CreateWithDisposable(o=>{Queuea=newQueue();Queueb=newQueue();objectgate=newobject();左。Subscribe(x=>{lock(gate){if(a.Count==0||a.Peek(){Queuea=newQueue();Queueb=newQueue();objectgate=newobject();boolleftComplete=false;boolrightComplete=false;MutableDisposableleftSubscription=newMutableDisposable();MutableDisposablerightSubscription=newMutableDisposable();ActiontryDequeue=()=>{lock(gate){while(a.Count!=0&&b.Count!=0){if(a.Peek()==b.Peek()){stringvalue=null;try{value=selector(a.Dequeue(),b.Dequeue());}catch(Exceptionex){o.OnError(ex);return;}o.OnNext(value);}elseif(a.Peek(){lock(gate){if(a.Count==0||a.Peek(){leftComplete=true;if(a.Count==0||rightComplete){o.OnCompleted();}});rightSubscription.Disposable=right.Subscribe(x=>{lock(gate){if(b.Count==0||b.Peek(){rightComplete=true;if(b.Count==0||leftComplete){o.OnCompleted();}});返回新的CompositeDisposable(左订阅,右订阅);});GroupBy可能会做你需要的事情似乎你对项目“加入”的时间没有时间限制,你只需要以某种方式将相似的项目放在一起。Observable.Merge(Observable.Range(1,10),Observable.Range(5,15)).GroupBy(k=>k).Subscribe(go=>go.Count().Where(cnt=>cnt>1).Subscribe(cnt=>Console.WriteLine("Key{0}has{1}matches",go.Key,cnt)));关于以上两点需要注意的是,Merge有以下重载,所以你的Areq有数百个连接流不会有问题:Merge(paramsIObservable[]sources);合并(此IEnumerable>来源);合并(这个IObservable>来源);此外,GroupBy返回IObservable>,这意味着您可以对每个组和每个组中的每个新成员做出反应——无需等到所有组都完成。这个答案是从Rx论坛复制过来的,只是为了在此处存档:varxs=Observable.Range(1,10);varys=Observable.Range(5,10);varjoined=fromxinxsfromyinyswherex==yselectx+"-"+y;或者没有查询表达式:varjoined=xs.SelectMany(x=>ys,(x,y)=>new{x,y}).Where(t=>tx==ty).Select(t=>tx+"-"+ty);如何在v.2838中使用新的Join运算符。vara=Observable.Range(1,10);varb=Observable.Range(5,10);varjoinedStream=a.Join(b,_=>Observable.Never(),_=>Observable.Never(),(aOutput,bOutput)=>newTuple(aOutput,bOutput)).Where(tupple=>tupple.Item1==tupple.Item2);joinedStream.Subscribe(output=>Trace.WriteLine(output));这是我第一次看到Join,我不确定像这样使用Never运算符是否明智。当处理大量输入时,由于会产生大量操作,因此会提取更多输入。我认为可以做的是在制作匹配时关闭窗口并使解决方案更有效率。那就是说上面的例子按照你的问题工作。作为记录,我认为斯科特的回答可能是这种情况下的方法。我只是将其视为一个潜在的选择。以上就是C#学习教程:如何连接多个IObservable序列?如果所有分享的内容对你有用,需要进一步了解C#学习教程,希望大家多多关注。本文收集自网络,不代表立场。如涉及侵权,请点击右侧联系管理员删除。如需转载请注明出处: