如何大量依赖I/O以适当地并行化工作我正在构建一个必须处理大量数据的控制台应用程序。基本上,应用程序从数据库中获取引用。对于每个引用,都会解析文件的内容并进行一些更改。这些文件是HTML文件,该过程使用RegEx替换来完成繁重的工作(查找参考并将其转换为链接)。然后将结果存储在文件系统中并发送到外部系统。如果我按顺序恢复该过程:varrefs=GetReferencesFromDB();//~5000数据行返回foreach(varrefinrefs){varfilePath=GetFilePath(ref);//此方法在先前加载的文件列表中查找varhtml=File.ReadAllText(filePath);//在本地读取html,或从网络驱动器读取varconvertedHtml=ParseHtml(html);File.WriteAllText(目标文件路径);//将结果复制到本地或网络驱动器SendToWs(ref,convertedHtml);我的程序运行良好,但速度很慢。这就是为什么我想并行化这个过程。到目前为止,我做了一个简单的并行化添加AsParallel:varrefs=GetReferencesFromDB().AsParallel();refs.ForAll(ref=>{varfilePath=GetFilePath(ref);varhtml=File.ReadAllText(filePath);varconvertedHtml=ParseHtml(html);File.WriteAllText(destinationFilePath);SendToWs(ref,convertedHtml);});这个简单的改变减少了过程的持续时间(减少25%的时间)。然而,我对并行化的理解是,如果依赖于I/O的资源被并行化,那么不会有太多好处(或者,更糟的是,好处更少),因为I/O不会神奇地翻倍。这就是为什么我认为我应该改变我的方法而不是并行化整个过程,而是创建排队任务的依赖链。IE,我应该创建一个进程:队列来读取文件。完成后,队列ParseHtml。完成后会将Queue发送到WS写入本地。完成后,记录结果。但是,我不知道如何实现这样的想法。我有一种感觉,它将以一组消费者/生产者队列结束,但我找不到合适的样本。另外,我不确定会有任何好处。感谢您的建议[编辑]事实上,我是使用c#4.5的完美人选...如果它只是rtm?[编辑2]另一件让我认为它没有正确并行化的事情是,在资源监视器中,我看到CPU、网络I/O和磁盘I/O的不稳定图形。当一个高时,其他低到中您没有在任何代码中使用任何异步I/OAPI。您所做的一切都受CPU限制,所有I/O操作都会浪费CPU资源阻塞。由于Parallel用于计算绑定任务,如果您想利用异步I/O,您需要在BeginXXX/EndXXX方法中执行,并在可用时利用这些方法。为初学者阅读:TPLTaskFactory.FromAsyncvsTaskswithblockingmethods接下来,无论如何在这种情况下您都不想使用AsParallel。AsParallel启用流式传输,这将导致每个项目立即安排一个新任务,但您在这里不需要/不想要它。最好使用Parallel::ForEach来划分工作。让我们看看如何使用这些知识在特定情况下实际最大并发性:varrefs=GetReferencesFromDB();//在这里使用Parallel::ForEach将在单独的工作线程上分区和处理您的数据Parallel.ForEach(refs,ref=>{stringfilePath=GetFilePath(ref);byte[]fileDataBuffer=newbyte[1048576];//需要直接使用FileStreamAPI,这样我们就可以启用异步I/OFileStreamsourceFileStream=newFileStream(filePath,FileMode.Open,FileAccess.Read,FileShare.Read,8192,true);//使用FromAsync从文件任务中读取数据readSourceFileStreamTask=Task.Factory.FromAsync(sourceFileStream.BeginReadsourceFileStream.EndReadfileDataBuffer,fileDataBuffer.Length,null);//添加一个将在异步读取完成时触发的延续readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent=>{intsoureFileStreamBytesRead;try{//准确确定读取了多少字节//注意:这将传播可能发生的任何潜在异常在EndReadsourceFileStreamBytesRead=readSourceFileStreamAntecedent.Result中;}finally{//始终清理源流sourceFileStream.Close();源文件流=空;}//这是为了确保您最终不会尝试读取大于此示例代码可以处理的文件if(sourceFileStreamBytesRead==fileDataBuffer.Length){thrownewNotSupportedException("您需要实现读取大于1MB的文件.:P");}//将文件数据转换为字符串stringhtml=Encoding.UTF8.GetString(fileDataBuffer,0,sourceFileStreamBytesRead);//解析HTML字符串convertedHtml=ParseHtml(html);//这是为了确保您最终不会尝试写入大于此示例代码可以处理的文件超过1MB。:P");}//将文件数据转换回字节以写入Encoding.UTF8.GetBytes(convertedHtml,0,convertedHtml.Length,fileDataBuffer,0);//需要直接使用FileStreamAPI才能启用异步I/OFileStreamdestinationFileStream=newFileStream(destinationFilePath,FileMode.OpenOrCreate,FileAccess.Write,FileShare.None,8192,true);//使用FromAsync从文件中读取数据//添加将在异步写入完成时触发的延续destinationFileStreamWriteAntecedent.ContinueWith(destinationFileStreamWriteAntecedent=>{try{//注意:我们在这里调用wait以观察EndWrite中可能发生的任何潜在异常destinationFileStreamWriteAntecedent.Wait();}finally{//始终关闭目标文件流destinationFileStream.Close();destinationFileStream=null;}},TaskContinuationOptions.AttachedToParent);//在SendToWs(ref,convertedHtml)之上发送到外部系统**并发**写入目标文件系统;},TaskContinuationOptions.AttachedToParent);});现在,这里有一些注意事项:这是示例代码,所以我使用1MBBuffer来读/写文件对于HTML文件来说是大材小用并且浪费系统资源。您可以降低它以满足您的最大需求,或者将链式读/写实现到StringBuilder中,这是我留给您的练习,因为我将编写~500多行代码来执行异步链式读/写。:P您会注意到,在读/写任务的延续中,我有TaskContinuationOptions.AttachedToParent。这非常重要,因为它将阻止Parallel::ForEach启动的工作线程在所有底层异步调用完成之前完成工作。如果没有它,您将同时启动所有5000个项目的作业,用数千个计划任务污染TPL子系统并且根本无法正确扩展。我调用SendToWs并将文件写入文件共享。我不知道SendToWs实现的底层基础是什么,但它听起来像是制作异步的一个很好的候选者。现在它假设它是纯粹的计算工作,因此,它会在执行时消耗CPU线程。我把它留作练习,让您弄清楚如何最好地使用我已经展示的方法来提高那里的吞吐量。这是各种自由形式,我的大脑是这里唯一的编译器,SO的语法突出显示是我用来确保语法良好的工具。所以请原谅任何语法错误,如果我搞砸了以至于你无法做出正面或反面,请告诉我,我会跟进。好消息是您的逻辑可以很容易地分成生产者-消费者管道中的步骤。如果您使用的是.NET4.0,则可以在每个步骤中使用BlockingCollection数据结构作为生产者-消费者队列的主干。主线程将每个工作项排入步骤1的队列,在那里将被拾取和处理,然后转发到步骤2的队列,依此类推。如果您愿意继续使用异步CTP,您还可以利用新的TPL数据流结构。除此之外,还有BufferBlock数据结构,其行为方式与BlockingCollection类似,并与新的async和await关键字很好地集成。由于您的算法是IO绑定的,生产者-消费者策略可能无法为您提供所需的性能提升,但至少您将拥有一个非常优雅的解决方案,如果您可以很好地提高IO吞吐量场景规模。我担心第1步和第3步会成为瓶颈,管道不会很好地平衡,但值得一试。只是一个建议,但是您是否看过消费者/生产者模式?一定数量的线程读取磁盘上的文件并将内容提供给队列。然后另一组线程(称为消费者)将在队列填满时“消费”队列。http://zone.ni.com/devzone/cda/tut/p/id/3023在这种情况下,您最好的选择绝对是生产者-消费者模型。一个拉取数据的线程和一堆处理它的工作人员。没有绕过I/O的简单方法,因此您可能只关心优化计算本身。我现在将尝试绘制一个模型://producerthreadvarrefs=GetReferencesFromDB();//~5000数据行返回foreach(varrefinrefs){lock(queue){queue.Enqueue(ref);事件.Set();}//如果队列有限,测试队列是否已满并等待。}//消费者线程while(true){value=null;锁(队列){如果(queue.Count>0){value=queue.Dequeue();}}if(value!=null)//处理值elseevent.WaitOne();//发出信号表明某项已放入队列的事件。您可以在C#中的线程的第一个线程上执行此操作在第4部分中查找有关生产者/消费者的更多详细信息:http://www.albahari.com/threading/part4.aspx我认为您拆分文件列表的方式并且批量处理每个文件是可以的。我的感觉是,如果你玩弄并行性,你可能会获得更多的性能提升。请参见:varrefs=GetReferencesFromDB().AsParallel().WithDegreeOfParallelism(16);这将同时开始处理16个文件。目前,您可能正在处理2个或4个文件,具体取决于您拥有的内核数量。这仅在您在没有IO的情况下进行计算时才有效。对于IO密集型任务,调整可以显着提高性能,减少处理时间。如果您打算使用生产者-消费者拆分和合并任务,请查看此示例:MergetwosequencesusingParallelLinqExtensions,howcanWhichproducesthefastestresultsfirst?以上就是《C#学习教程:如何正确依赖I/O大量并行工作》的全部内容。如果对你有用,需要进一步了解C#学习教程,希望大家多多关注。本文收集自网络,不代表立场,如涉及侵权,请点击右边联系管理员删除。如需转载请注明出处:
