今天要说的是“并行计算”。作为一个非科学的人,我何必在课堂上纠结这么专业的问题。这是说说我前几天做的一个作业。当时用python写了一个程序,跑了一天。速度让我担心。我如何优化它并提交作业?于是去各种论坛找药丸,最后发现并行计算可以最大程度地压榨计算机的CPU,提高计算效率,而python中有一个multiprocessing库可以提供并行计算界面,就这样让男孩度过了一天。时间去完善程序,终于在规定的时间内做出了满意的结果,并交上了作业。之后对并行计算充满了兴趣,于是再次访问谷歌,大致了解了GPU、CPU、进程、线程、并行计算、分布式计算等概念,又把玩python的multiprocessing。现在略有体会,特来此立碑,以示后世游人。这篇文章分为四个部分,一个是大数据时代的现状,一个是面对挑战的方法,然后是用python写并行程序,最后是多进程的实战。一、大数据时代的现状我们目前正处于大数据时代。我们每天都会不断地通过手机、电脑等设备将自己的数据传输到互联网上。据统计,YouTube上每分钟新增500多个小时的视频。面对如此海量的数据,如何对其进行高效的存储和处理成为当前最大的挑战。但是在这个对硬件要求越来越高的时代,CPU似乎就没有那么强大了。2013年以来,处理器主频增速逐渐放缓,目前CPU主频主要分布在3~4GHz。这也是可以理解的。毕竟,摩尔定律已经生效了50年。如果还是那么厉害,那我们就等着以后处理器频率提升了,以后再有计算问题都不是问题了。事实上,CPU和频率与能耗密切相关。之前我们可以通过加电压来提高频率,但是当能耗过高时,散热问题无法解决,所以频率逐渐趋于稳定。Intel、AMD等大厂也纷纷将重心转移到多核芯片上,目前常见的台式机也达到了4-8核。2.应对挑战之道我们拥有多核CPU和大量的计算设备,如何利用它们来应对大数据时代的挑战。那么有必要提一下下面的方法。2.1并行计算并行性(parallelism)是指程序运行时的状态。如果有多个“工作单元”同时运行,则运行的程序处于并行状态。图1是一个并行程序的例子。并行程序启动后,程序从主线程中拆分出许多小线程,同步执行。这时,每个线程都运行在一个单独的CPU上。所有线程运行完毕后,会重新Merge到主线程中,运行结果也会合并,交给主线程做进一步处理。图1.多线程并行图2是一个多线程任务(沿线是线程时间),但它不是并行任务。这是因为task1和task2并不总是同时执行。在这种情况下,单核CPU可以同时执行task1和task2。方法是在task1没有执行的时候,立即将CPU资源用于task2,当task2空闲时,将CPU用于task1。这样通过时间窗口调整任务就可以实现多线程程序,但是task1和task2并没有同时执行,所以不能称为并行。我们可以称之为并发(concurrency)程序。这个方案在一定意义上提高了单个CPU的使用率,所以效率比较高。图2.多线程并发并行编程模型:数据并行(DataParallel)模型:同一个操作同时作用于不同的数据,只需要简单指定执行什么并行操作和并行操作对象即可。这种模型体现在图1中,即在主线程中取数据进行并行处理,线程执行相同的操作,计算完成后再合并结果。每个并行线程在执行过程中互不干扰。消息传递(MessagePassing)模型:消息在各个并行执行部分之间传递并相互通信。消息传递模型的并行线程将在执行期间传递数据。可能一个线程跑到一半的时候,它占用的数据或者处理结果会交给另一个线程处理。这样一来,在设计并行程序的时候,就会给我们带来一定的麻烦。该模型一般用于分布式内存并行计算机,但也适用于共享内存并行计算机。何时使用并行计算:多核CPU-计算密集型任务。尝试使用并行计算来提高任务执行的效率。计算密集型任务将继续占用CPU。这时候分担任务的CPU越多,计算速度就会越快。这就是并行程序派上用场的地方。单核CPU-计算密集型任务。这个时候任务已经消耗了100%的CPU资源,所以没有必要使用并行计算,毕竟那里有硬件障碍。单核CPU-I/O密集型任务。I/O密集型任务在任务执行过程中需要频繁调用磁盘、屏幕、键盘等外设。由于调用外设时CPU会空闲,所以CPU利用率不高。这时候使用多线程程序只是为了方便。机交互。计算效率并没有太大提高。多核CPU-I/O密集型任务。与单核CPU相同-I/O密集型任务。2.2切换到GPU处理计算密集型程序。GPU是图形处理单元(GPU)。它是显卡的心脏。显卡上也有显存。GPU和显存类似于CPU和内存。GPU和CPU有不同的设计目标。CPU需要处理所有的计算指令,因此其单元设计相当复杂;而GPU主要是为图形“渲染”而设计的,也就是对数据进行列处理,所以GPU天生就是为了更快地执行复杂的算术和几何运算。与CPU相比,GPU有以下优势:强大的浮点数计算速度。大量的计算核心可以进行大规模的并行计算。普通的GPU也有数千个计算核心。数据吞吐量强,GPU的吞吐量是CPU的几十倍,也就是说GPU适合处理大数据。GPU目前在处理深度学习方面使用较多,英伟达(NVIDIA)目前正花费大量精力开发适合深度学习的GPU。具有数百层的神经网络现在非常普遍。面对如此庞大的计算量,CPU可能需要计算好几天,但GPU可以在几个小时内完成计算。这个差距足够别人比我们多玩几场了。,又发表了几篇论文。3.3分布式计算说到分布式计算,我们先来说说Google的三篇论文。原文可直接点击链接下载:GFS(TheGoogleFileSystem):解决数据存储问题。使用N台以上的廉价电脑,利用冗余来达到读写速度和数据安全并存的结果。MapReduce(SimplifiedDataProcessingonLargeClusters):函数式编程,将所有操作分为两类,map和reduce。Map用于将数据分成多个部分,分别处理。Reduce将处理后的结果进行合并,得到最终的结果。BigTable(BigTable:ADistributedStorageSystemforStructuredData):一种在分布式系统上存储结构化数据的解决方案,解决了庞大Table的管理和负载均衡问题。Google在2003~2006年发表了这三篇论文后,引起了一阵轰动,但是Google并没有开源MapReduce。在这种情况下,Hadoop出现了。DougCutting根据Google的三篇论文的理论开发了Hadoop。从那时起,Hadoop不断成熟。目前,Facebook、IBM、ImageShack等知名公司都在使用Hadoop来运行他们的程序。分布式计算的优点:可以集成多台(上千台)低配置计算机进行高并发存储和计算,从而达到媲美超级计算机的处理能力。3、用python写并行程序在介绍如何用python写并行程序之前,我们需要补充几个概念,即进程、线程和全局解释器锁(GlobalInterpreterLock,GIL)。3.1进程与线程进程(process):在为线程设计的系统中(比如大多数现代操作系统,Linux2.6及更新的版本),进程本身并不是基本的运行单位,而是线程的容器。进程有自己独立的内存空间,自己的线程可以访问进程的空间。程序本身只是对指令、数据及其组织形式的描述,进程才是程序真正运行的实例。例如,VisualStudio开发环境是一个应用程序,它使用一个进程编辑源文件,另一个进程进行编译。线程:线程有自己的一组CPU指令、寄存器和私有数据区,线程数据可以与同一进程的线程共享。现在的操作系统都是面向线程的,即线程是运行的基本单位,CPU是按照线程来分配的。进程和线程之间有两个主要区别。一种是进程中包含线程,线程使用进程的内存空间。当然,线程也有自己的私有空间,但容量较小;干扰,而线程是共享内存空间。图3显示了进程、线程和CPU之间的关系。图3中,进程1和进程2都包含3个线程,CPU会根据线程分配任务。当当前线程执行完毕后,等待的线程就会被唤醒,进入CPU执行。一般来说,一个进程拥有的线程越多,占用CPU的时间就越长。图3.进程、线程和CPU的关系3.2全局解释器锁GIL:GIL是计算机编程语言解释器用来同步线程的一种机制,它在任何时候都只允许一个线程执行。即使在多核处理器上,使用GIL的解释器一次也只允许一个线程执行。Python的Cpython解释器(常用的解释器)使用GIL在一个Python解释器进程中执行多线程程序,但是每执行一个线程都会获得一个全局解释器锁,这样其他线程只能等待,因为GIL会被在GIL快释放的时候被原线程立即获取,而那些等待的线程可能刚刚被唤醒,所以往往会造成线程享受CPU资源不平衡。这时候多线程的效率比单线程还要低。在python的官方文档中,是这样解释GIL的:在CPython中,globalinterpreterlock,即GIL,是一种互斥体,可以防止多个本机线程同时执行Python字节码。这个锁是必要的,主要是因为CPython的内存管理不是线程安全的。(不过,自从有了GIL,其他的特性就依赖于它强制执行的保证。)可以说它的初衷是非常好的,为了保证线程间的数据安全;随着Python的发展,GIL成为了python并行计算的最大障碍,但是此时GIL已经遍布CPython,修改它的工作量实在是太大了,尤其是对于这个开源的呼声。不过好在GIL只是锁线程,我们可以新建一个解释器进程来实现并行,这就是multiprocessing的工作。3.3multiprocessingmultiprocessing是python中的一个multiprocessing包,通过它我们可以在一个python程序中创建多个进程来执行任务,从而进行并行计算。官方文档如下:multiprocessingpackageoffersbothlocalandremoteconcurrency,effectivelyside-steppingtheGlobalInterpreterLockbyusingsubprocessesinsteadofthreads.接下来介绍multiprocessing的各个接口:3.3.1Processprocessmultiprocessing.Process(target=None,args=())target:run()可以调用的函数,简单来说就是运行在的函数processargs:是目标参数的方法process:start():启动进程,进程创建后Executejoin([timeout]):阻塞当前父进程,直到调用join方法的进程执行完毕或超时(timeout),然后继续执行父进程terminate():终止进程,不管进程是否执行过,尽量少用。例1frommultiprocessingimportProcessdeff(name):print'hello',nameif__name__=='__main__':p=Process(target=f,args=('bob',))#p进程执行函数f,参数是'bob',注意后面的","p.start()#进程启动p.join()#阻塞主线程,直到p进程执行结束3.3.2ProcessPoolsProcessPoolsclassmultiprocessing.Pool([processes])processes是进程池中的进程数,默认为机器的CPU数方法:apply(func[,args[,kwds]])进程池中的进程执行func函数操作,过程将被阻塞,直到产生结果。apply_async(func[,args[,kwds[,callback]]])类似于apply,但是不会阻塞进程map(func,iterable[,chunksize])进程池中的进程执行映射操作map_async(func,iterable[,chunksize[,callback]])imap(func,iterable[,chunksize]):返回有序迭代器imap_unordered(func,iterable[,chunsize]):返回无序迭代器close():禁止进程池接收tasksterminate():强制终止进程池,不管是否有任务在执行join():close()或terminate()后,等待进程退出例2frommultiprocessingimportPooldeff(x):returnx*xif__name__=='__main__':p=Pool(5)#创建一个有5个进程的进程池print(p.map(f,[1,2,3]))#将f函数的操作交给processpool3.3.3Pipes&Queuesmultiprocessing.Pipe([duplex])返回两个连接对象(conn1,conn2),两个连接对象分别访问head和tailof用于读写操作的管道。Duplex:True(默认),创建的管道是双向的,即两端都可以读写;如果为False,管道是单向的,只能一端读,另一端写,此时类似于Queue。multiprocessing.Queue([maxsize])qsize():返回队列中的成员数empty():如果队列为空则返回truefull():如果队列中的成员数达到maxsize则返回trueput(obj):放入一个对象到队列中get():从队列中取出一个对象并从队列中移除,先进先出原则close():关闭队列并将缓存的对象写入管道ExamplefrommultiprocessingimportPoolimporttimedeff(x):returnx*xif__name__=='__main__':pool=Pool(processes=4)#启动4个工作进程result=pool.apply_async(f,(10,))#在单个进程中异步评估“f(10)”printresult.get(timeout=1)#打印“100”,除非你的计算机*非常*慢printpool.map(f,range(10))#打印“[0,1,4,...,81]”it=pool.imap(f,range(10))printit.next()#打印“0”打印它。next()#打印“1”printit.next(timeout=1)#打印“4”,除非你的电脑*非常*慢result=pool.apply_async(time.sleep,(10,))printresult.get(timeout=1)#raisesmultiprocessing.TimeoutError3.3.4processlockmultiprocessing.Lockwhen一个进程获取(acquire)锁后,其他进程将被禁止获取锁,可以保护数据并进行同步处理。acquire(block=True,timeout=None):尝试获取锁。如果block为true,则获取到锁后会阻止其他进程获取锁。release():释放锁3.3.5共享内存——Value,Array共享内存通常需要用进程锁来处理,以保证处理顺序相同。multiprocessing.Value(typecode_or_type,*args[,lock])返回一个ctype对象,创建c=Value('d',3.14),调用c.value()multiprocessing.Array(typecode_or_type,size_or_initializer,*,lock=True)返回一个ctype数组,只能是一维的Array('i',[1,2,3,4])3.3.6其他方法multiprocessing.active_children():返回当前进程的所有子进程multiprocessing.cpu_count():返回本机CPU个数multiprocessing.current_process():返回当前进程3.3.7注意事项:尽量避免共享数据。尽可能地腌制所有对象。避免使用terminate强行终止进程造成不可预知的后果。有队列在进程终止之前,需要清除队列中的数据。加入操作应该放在队列中并清空,以便将资源和参数传递给子进程。Windows平台还需要注意:注意跨模块全局变量的使用,可能会被每个进程修改,导致结果不一致主模块需要加上ifname=='main':来提高其安全性.如果有交互界面,需要添加freeze_support()4.Multiprocessing实战过程,lock和valuetry:importmultiprocessingasmpimporttimedefjob(v,num,l):l.acquire()#lockfor_inrange(5):time.sleep(0.1)v.value+=num#获取共享内存print(v.value)l.release()#释放defmulticore():l=mp.Lock()#定义一个进程lock#l=1v=mp.Value('i',0)#定义共享内存p1=mp.Process(target=job,args=(v,1,l))#需要把锁传给p2=mp.Process(target=job,args=(v,3,l))p1.start()p2.start()p1.join()p2.join()if__name__=='__main__':multicore()上面代码叠加共享内存5次,p1进程每次叠加1,p2进程3的每次叠加,为了防止p1和p2在运行过程中抢夺共享数据v,在执行过程中对进程加锁,从而保证执行顺序。我测试了三种情况:直接运行上面的代码输出[1,2,3,4,5,8,11,14,17,20],运行时间为1.037s,锁在注释掉的基础上1(上面注释了三行),在没有锁的情况下,输出为[1,4,5,8,9,12,13,15,14,16],运行时间为0.53s。在2的基础上,将p1.join()调整到p2.start()的前面,输出为[1,2,3,4,5,8,11,14,17,20],运行时间为1.042s。可以发现调整joinwithoutlock可以达到和加锁类似的效果,因为join是阻塞的,直到当前进程结束才会回到主进程。如果p1.join()放在p1.start()之后,主进程会立即被阻塞,这样p2要等到后面才能启动,这和锁的效果是一样的。如果如上代码所示,p1.join()在p2.start()之后,虽然p1join()在先,但此时只是阻塞了主进程,而p2是兄弟进程,已经启动了,而p1会停不下来,所以如果这时候没有锁,p1和p2是并行的,运行时间是一半,但是因为他们争夺共享变量,所以输出就变得不确定了。poolimportmultiprocessingasmp#importpdbdefjob(i):returni*idefmulticore():pool=mp.Pool()#pdb.set_trace()res=pool.map(job,range(10))打印(res)res=pool.apply_async(job,(2,))#通过get获取结果print(res.get())#迭代器,当i=0时应用一次,当i=1时应用一次,等等multi_res=[pool.apply_async(job,(i,))foriinrange(10)]#从迭代器中取出print([res.get()forresinmulti_res])multicore():⑧⑤⑤-④zero⑧-⑧⑨③从零基础到Python各个领域的项目实战教程、开发工具和电子书。与大家分享公司目前对python人才的需求和学好python的高效技能,持续更新最新教程!Pool其实很好用,尤其是map和apply_async。通过池接口,我们只需要指定可以并行化的函数和函数参数列表,它就可以自动帮我们创建一个多进程池进行并行计算,实在是太方便了。Pool特别适用于数据并行模型。如果是消息传递模型,建议通过进程创建进程。总结一下,这次主要是根据自己的理解整理了并行计算,讲解了进程、线程、CPU的关系,进行了pythonmultiprocessing封装。个人觉得这里面的学问还是很多的
