TPL数据流中如何安排流控?我正在尝试控制TPLDataflow中的数据流。我有一个非常快的生产者和一个非常慢的消费者。(我的真实代码更复杂,但是,它是一个很好的模型,它重现了问题。)当我运行它时,代码开始消耗内存,这是应该的——生产者的输出队列已经耗尽。可能会很快填满。我真正希望看到的是生产商停止运行一段时间,直到消费者有机会提出要求。根据我对文档的阅读,这是应该发生的事情:也就是说,我认为生产者等待消费者有空间。显然情况并非如此。如何修复它以方便队列不会出错?使用System.Linq;使用System.Threading.Tasks;使用System.Threading.Tasks.Dataflow;使用系统线程;namespaceMemoryLeakTestCase{classProgram{staticvoidMain(string[]args){varCreateData=newTransformManyBlock(ignore=>{returnEnumerable.Range(0,1000*1000*1000).Select((s,i)=>"你好,世界"+i);});varParseFile=newTransformManyBlock(fileContent=>{Thread.Sleep(1000);returnEnumerable.Range(0,100).Select((sst,iii)=>"Hello,"+iii);},newExecutionDataflowBlockOptions(){有界容量=1000});varEndOfTheLine=newActionBlock(f=>{});varlinkOptions=newDataflowLinkOptions{PropagateCompletion=true,};CreateData.LinkTo(ParseFile,linkOptions);ParseFile.LinkTo(EndOfTheLine,linkOptions);Taskt=newTask(()=>{while(true){Console.WriteLine("CreateData:"+Report(CreateData));Console.WriteLine("ParseData:"+Report(ParseFile));Console.WriteLine("NullTarget:"+EndOfTheLine.InputCount);线程.睡眠(1000);}});t.开始();CreateData.SendAsync(0);CreateData.Complete();EndOfTheLine.Completion.Wait();}publicstaticstringReport(TransformManyBlockblock){returnString.Format("INPUT:{0}OUTPUT:{1}",block.InputCount.ToString().PadLeft(10,''),block.OutputCount.ToString().PadLeft(10,''));通常在这种情况下你会做的是设置CreateData块的BoundedCapacity,但这在这里不起作用,因为当从单个IEnumerable填充输出队列时,TransformManyBlock似乎没有考虑BoundedCapacity。您可以做的是创建一个迭代集合的函数,并使用SendAsync()仅在目标可以接受的情况下发送更多数据://////如果迭代数据引发异常,则目标块出错///并且返回的任务成功完成。//////根据使用情况,这可能是也可能不是您想要的。///publicstaticasyncTaskSendAllAsync(thisITargetBlocktarget,IEnumerabledata){try{foreach(varitemindata){awaittarget.SendAsync(item);}}}catch(Exceptione){target.Fault(e);}}用法:vardata=Enumerable.Range(0,1000*1000*1000).Select((s,i)=>"Hello,World"+i);等待ParseFile.SendAllAsync(数据);ParseFile.Complete();如果您仍然希望CreateData块的行为像原始代码一样,您可以有两个有界的BufferBlocks,SendAllAsync(),然后使用Encapsulate()使它们看起来像一个块:以上是C#学习教程:如何安排流程控制TPL数据流?如果分享的所有内容对你有用,需要了解更多C#学习教程,希望你多多关注——//////boundedCapacity表示输入队列的容量///和输出单独排队,而不是他们的总数。///publicstaticIPropagatorBlockCreateBoundedTransformManyBlock(Func>transform,intboundedCapacity){varinput=newBufferBlock(newDataflowBlockOptions{BoundedCapacity=boundedCapacity});varoutput=newBufferBlock(newDataflowBlockOptions{BoundedCapacity=boundedCapacity});{Task.Run(async()=>try{while(awaitinput.OutputAvailableAsync()){vardata=transform(awaitinput.ReceiveAsync());awaitoutput.SendAllAsync(data);}output.Complete();}catch(Exceptione){((IDataflowBlock)input).Fault(e);((IDataflowBlock)output).Fault(e);}});返回数据流块。封装(输入,输出);}本文收集自网络,不代表立场。如涉及侵权,请点击右侧联系管理员删除。如需转载请注明出处:
