一直觉得自己对并发的理解不是很深,尤其是看了《代码整洁之道》之后,觉得有必要学习并发编程,因为性能也是一个衡量代码清洁度的主要标准。而且在《失控》一书中多次提到并发,无论是计算机还是生物,各种事物都是并发处理的。人真的很奇怪。当你关注一件事时,你会发现这件事经常出现在周围的事物中。所以好奇心驱动学习并发性。有这篇文章。1.理解硬件线程和软件线程多核处理器有多个物理内核——一个物理内核是一个真正独立的处理单元,多个物理内核使得多条指令可以同时并行运行。硬件线程也称为逻辑核心,一个物理核心可以使用超线程技术提供多个硬件线程。所以一个硬件线程并不代表一个物理内核;Windows中每一个运行的程序都是一个进程,每个进程都会创建并运行一个或多个线程,这些线程称为软件线程。硬件线程就像泳道,软件线程就是在其中游泳的人。2.并行场合.NetFramework4引入了一个新的任务并行库(TaskParallelLibrary,TPL),它支持数据并行、任务并行和流水线。让开发者应对不同的并行场合。数据并行:需要处理的数据量很大,必须对每条数据进行相同的操作。例如,使用256位的密钥,用AES算法对100个Unicode字符串进行加密。任务并行性:通过任务同时运行不同的操作。示例包括生成文件哈希、加密字符串和创建缩略图。流水线:这是任务并行性和数据并行性的结合。TPL引入了System.Threading.Tasks,主要类是Task,这个类代表了一个异步并发操作,但是我们不一定要使用Task类的实例,我们可以使用Parallel静态类。它提供了三种方法:Parallel.Invoke、Parallel.ForParallel.Forecah。三、Parallel.Invoke尝试使许多方法并行运行的最简单方法是使用Parallel类的Invoke方法。例如,有四种方法:WatchMovieHaveDinnerReadBookWriteBlog您可以通过以下代码使用并行性。System.Threading.Tasks.Parallel.Invoke(WatchMovie,HaveDinner,ReadBook,WriteBlog);此代码为每个方法创建委托。Invoke方法接受一个Action参数集。1publicstaticvoidInvoke(paramsAction[]actions);使用lambda表达式或匿名委托可以达到相同的效果。System.Threading.Tasks.Parallel.Invoke(()=>WatchMovie(),()=>HaveDinner(),()=>ReadBook(),delegate(){WriteBlog();});1.没有特定的执行顺序。Parallel.Invoke方法只会在所有4个方法都完成后返回。至少需要4个硬件线程才能让这4个方法并发运行。但是,不能保证这四种方法可以同时开始运行。如果一个或多个核很忙,底层的调度逻辑可能会延迟某些方法的初始化执行。通过为方法添加延迟,可以看到必须等待最长的方法完成才能返回到main方法。staticvoidMain(string[]args){System.Threading.Tasks.Parallel.Invoke(WatchMovie,HaveDinner,ReadBook,WriteBlog);Console.WriteLine("执行完成");Console.ReadKey();}staticvoidWatchMovie(){Thread.Sleep(5000);Console.WriteLine("看电影");}staticvoidHaveDinner(){Thread.Sleep(1000);Console.WriteLine("吃晚饭");}staticvoidReadBook(){Thread.Sleep(2000);Console.WriteLine("Reading");}staticvoidWriteBlog(){Thread.Sleep(3000);Console.WriteLine("WriteBlog");}这样会导致很多逻辑核长时间空闲。4.Parallel.ForParallel.For为固定次数的独立For循环迭代提供负载均衡的并行执行(即将工作分配给不同的任务,使所有任务大部分时间都可以保持忙碌)。这样就可以尽可能地充分利用所有可用的内核。下面我们来对比一下两种方法,一种是使用For循环,另一种是使用Parallel.For生成key并转换为16进制字符串。privatestaticvoidGenerateAESKeys(){varsw=Stopwatch.StartNew();for(inti=0;i{varaesM=newAesManaged();aesM.GenerateKey();byte[]result=aesM.Key;stringhexStr=ConverToHexString(result);});Console.WriteLine("Parallel_AES:"+sw.Elapsed.ToString());}privatestaticintNUM_AES_KEYS=100000;staticvoidMain(string[]args){Console.WriteLine("Execute"+NUM_AES_KEYS+"times:");生成AES密钥();ParallelGenerateAESKeys();Console.ReadKey();}执行1,000,000次。这里,并行时间是串行时间的一半。5.Parallel.ForEach在Parallel.For中,有时优化现有循环可能是一项非常复杂的任务。Parallel.ForEach为固定数量的独立ForEach循环迭代提供负载均衡的并行执行,并支持自定义分区器,让用户可以完全控制数据分布。其实质就是把所有要处理的数据分成几部分,然后并行运行这些串行循环。修改以上代码:System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1,NUM_AES_KEYS+1),range=>{varaesM=newAesManaged();Console.WriteLine("AESRange({0},{1}循环开始时间:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay);for(inti=range.Item1;i(Partitionersource,Actionbody)Parallel.ForEach方法定义了两个参数,source和Body。源是指分区程序。提供分解为分区的数据源。body是要调用的委托。它接受每个定义的分区作为参数。总共有20多个重载。在上面的例子中,分区类型是Tuple,是一个二元组类型。此外,还会返回一个ParallelLoopResult值。Partitioner.Create根据逻辑核心数和其他因素创建分区。publicstaticOrderablePartitioner>Create(intfromInclusive,inttoExclusive){intnum=3;if(toExclusive<=fromInclusive)thrownewArgumentOutOfRangeException("toExclusive");intrangeSize=(toExclusive-fromInclusive)/(PlatformHelper.ProcessorCount*num);如果(范围大小==0)范围大小=1;returnPartitioner.Create>(Partitioner.CreateRanges(fromInclusive,toExclusive,rangeSize),EnumerablePartitionerOptions.NoBuffering);}所以我们可以修改分区数,rangesize大概在250000左右。也就是说我的逻辑核心是4.varrangesize=(int)(NUM_AES_KEYS/Environment.ProcessorCount)+1;System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1,NUM_AES_KEYS+1,rangesize),range=>再次执行:partitions变成4个,时间上相差不大(第一次是串行时间)。我们看到这四个分区几乎是同时执行的,大部分时候在后台使用TPL的负载均衡机制,效率很高,但是对分区的控制方便用户分析自己的工作负载,提高整体性能.parallel.ForEach也可以重构IEnumerable集合.Enumerable.Range序列化产生的个数.但是这样不会有上面的分区效果.privatestaticvoidParallelForEachGenerateMD5HasHes(){varsw=Stopwatch.StartNew();System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1,NUM_AES_KEYS),number=>{varmd5M=MD5.Create();byte[]data=编码.Unicode.GetBytes(Environment.UserName+number);byte[]result=md5M.ComputeHash(data);stringhexString=ConverToHexString(result);});Console.WriteLine("MD5:"+sw.Elapsed.ToString());}#p#第六,退出循环不同于串行操作中的中断,ParallelLoopState提供了两种停止执行Parallel.For和Parallel.ForEach的方法。Break:使循环在执行完当前迭代后尽快停止执行。例如,如果执行到100次,循环将处理所有小于100的迭代。停止:使循环尽快停止执行。如果执行了100次迭代,则无法保证将处理所有少于100次的迭代。修改上面的方法:3秒后退出。privatestaticvoidParallelLoopResult(ParallelLoopResultloopResult){stringtext;if(loopResult.IsCompleted){text="循环完成";}else{if(loopResult.LowestBreakIteration.HasValue){text="中断终止";}else{text="停止终止";}}Console.WriteLine(text);}privatestaticvoidParallelForEachGenerateMD5HasHesBreak(){varsw=Stopwatch.StartNew();varloopresult=System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1,NUM_AES_KEYS),(intnumber,ParallelLoopStateloopState)=>{varmd5M=MD5.Create();byte[]data=Encoding.Unicode.GetBytes(Environment.UserName+number);byte[]result=md5M.ComputeHash(data);stringhexString=ConverToHexString(result);if(sw.Elapsed.Seconds>3){loopState.Stop();}});ParallelLoopResult(loopresult);Console.WriteLine("MD5:"+sw.Elapsed);}7.捕获并行循环中发生的异常。当并行迭代中调用的delegate抛出异常,而delegate没有捕捉到异常时,就会变成一组异常,newSystem.AggregateException负责处理这组异常。privatestaticvoidParallelForEachGenerateMD5HasHesException(){varsw=Stopwatch.StartNew();varloopresult=newParallelLoopResult();try{loopresult=System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1,NUM_AES_KEYS),(number,loopState)=>{varmd5M=MD5.Create();byte[]data=Encoding.Unicode.GetBytes(Environment.UserName+number);byte[]result=md5M.ComputeHash(data);stringhexString=ConverToHexString(result);if(sw.Elapsed.Seconds>3){thrownewTimeoutException("超过三秒");}});}catch(AggregateExceptionex){foreach(varinnerExinex.InnerExceptions){Console.WriteLine(innerEx.ToString());}}ParallelLoopResult(loopresult);Console.WriteLine("MD5:"+sw.Elapsed);}结果:多次出现异常。#p#八、指定并行度。TPL方法将始终尝试利用所有可用的逻辑内核来实现最佳结果,但有时您不想在并行循环中使用所有内核。比如你需要留出一个不参与并行计算的内核来创建一个可以响应用户的应用程序,而这个内核需要帮你运行其他部分的代码。这时候一个好的解决方案是指定最大并行度。这需要创建ParallelOptions的实例并设置MaxDegreeOfParallelism的值。privatestaticvoidParallelMaxDegree(intmaxDegree){varparallelOptions=newParallelOptions();parallelOptions.MaxDegreeOfParallelism=maxDegree;varsw=Stopwatch.StartNew();System.Threading.Tasks.Parallel.For(1,NUM_AES_KEYS+1,parallelOptions,(inties)==newAesManaged();aesM.GenerateKey();byte[]result=aesM.Key;stringhexStr=ConverToHexString(result);});Console.WriteLine("AES:"+sw.Elapsed.ToString());}呼叫:如果在四核微处理器上运行,则将使用3个内核。ParallelMaxDegree(Environment.ProcessorCount-1);时间大概慢了一点(第一个Parallel.For3.18s),但是可以腾出一个core来处理其他事情。总结:这次学习了Parallel相关的方法以及如何退出并行循环和捕获异常,设置并行度,并行知识。园内也有类似的博客。不过作为自己知识的管理,这里就整理一下吧。