Parallel.ForEachstalledwhenintegrationwithBlockingCollection私有只读动作_action;私有只读TaskFactory_factory=newTaskFactory();私有CancellationTokenSource_tokenSource;privatereadonlyBlockingCollection_entries=newBlockingCollection();私人任务_task;publicParallelConsumer(intmaxParallel,Actionaction){_maxParallel=maxParallel;_action=动作;}publicvoidStart(){try{_tokenSource=newCancellationTokenSource();_task=_factory.StartNew(()=>{Parallel.ForEach(_entries.GetConsumingEnumerable(),newParallelOptions{MaxDegreeOfParallelism=,CancellationToken=_tokenSource.Token},(item,loopState)=>{Log("Taking"+item);if(!_tokenSource.IsCancellationRequested){_action(item);Log("Finished"+item);}else{Log("NotTaking"+项目);_entries.CompleteAdding();loopState.Stop();}});},_tokenSource.Token);}catch(OperationCanceledExceptionoce){System.Diagnostics.Debug.WriteLine(oce);}}privatevoidLog(stringmessage){Console.WriteLine(message);}publicvoidStop(){Dispose();}publicvoidEnqueue(Tentry){Log("Enqueuing"+entry);_entries.Add(条目);}publicvoidDispose(){if(_task==null){返回;}_tokenSource.Cancel();while(!_task.IsCanceled){}_task.Dispose();_tokenSource.Dispose();_任务=空;}}这是一个测试代码classProgram{staticvoidMain(string[]args){TestRepeatedEnqueue(100,1);}privatestaticvoidTestRepeatedEnqueue(intitemCount,intparallelCount){bool[]flags=newbool[itemCount];varconsumer=newParallelConsumer(parallelCount,(i)=>{flags[i]=true;});消费者。开始();对于(inti=0;ib==true));测试总是失败——它总是停留在测试的100个项目中的第93个知道我的代码的哪一部分导致了这个问题,以及如何解决它?正如您所发现的,您不能将Parallel.Foreach()与BlockingCollection.GetConsumingEnumerable()一起使用。请参阅此博客文章以获取解释:http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx该博客还提供了一个名为GetConsumingPartitioner()的方法的源代码,您可以使用该方法可以用来解决问题。摘自博客:BlockingCollection的GetConsumingEnumerable实现是使用BlockingCollection的内部同步,支持同时多个消费者,但是ForEach并不知道这一点,其可枚举分区逻辑在访问可枚举时也需要加锁。所以这里的同步比实际需要的多,导致潜在的不可忽略的性能损失。[另外]Parallel.ForEach和PLINQ使用的分区算法默认使用分块来最小化同步成本:它不是锁定每个元素一次,而是获取锁,获取一组元素(一个块),然后释放锁。虽然这种设计有助于提高整体吞吐量,但对于低延迟更为重要的场景,这种分块可能会让人望而却步。这失败了,因为Parallel.ForEach和PLINQ默认使用的分区算法如下所述使用分块来最小化同步成本:它不是每个元素锁定一次,而是获取锁,获取一组元素(一个块),然后释放锁。为了让它工作,你可以在ParallelConsumer类上添加一个方法来指示添加完成,如下所示publicvoidStopAdding(){_entries.CompleteAdding();现在在for循环之后调用此方法,如下所示consumer.Start();for(inti=0;i否则,Parallel.ForEach()将等待达到阈值以获取块并开始处理。以上是C#学习教程:与BlockingCollection集成时,Parallel.ForEach停滞所有共享的内容。如果对大家有用,需要了解更多C#学习教程,希望大家多多关注---本文来自网络收集,不代表立场,如涉及侵权,请点击有权联系管理员删除。如需转载请注明出处:
