Python2.6版本增加了多处理模块。它最初由JesseNoller和RichardOudkerk在PEP371中定义。就像您可以使用线程模块生成线程一样,多处理模块允许您生成进程。这里使用的想法是:由于您现在可以生成进程,因此可以避免使用全局解释器锁(GIL)并充分利用机器的多个处理器。multiprocessing包还包含一些根本不在线程模块中的API。例如:有一个灵活的Pool类,允许您在多个输入上并行执行函数。我们将在后面的部分解释Pool类。我们将从多处理模块的Process类开始。开始学习多处理模块Process这个类和threading模块中的Thread类非常相似。让我们创建一系列调用相同函数的进程,看看它是如何工作的。importosfrommultiprocessingimportProcessdefdoubler(number):"""Adoublingfunctionthatcanbeusedyaprocess"""result=number*2proc=os.getpid()print('{0}doubledto{1}byprocessid:{2}'.format(number,result,proc))if__name__=='__main__':numbers=[5,10,15,20,25]procs=[]forindex,numberinenumerate(numbers):proc=Process(target=doubler,args=(number,))procs.append(proc)proc.start()forprocinprocs:proc.join()对于上面的例子,我们导入了Process类并创建了一个名为doubler的函数。在函数中,我们将传入的数字乘以2。我们还使用Python的os模块来获取当前进程的ID(pid)。这个ID会告诉我们哪个进程正在调用doubler函数。然后,在下面的代码块中,我们实例化了一系列Process类并启动它们。***一个循环简单地调用每个进程的join()方法,它告诉Python等待进程直到它结束。如果你需要终止一个进程,你可以调用它的terminate()方法。当您运行上面的代码时,您应该看到类似于以下内容的输出:5doubledto10byprocessid:1046810doubledto20byprocessid:1046915doubledto30byprocessid:1047020doubledto40byprocessid:1047125doubledto50byprocessid:10472有时,您希望为您的进程提供一个人类可读的名称。幸运的是,Process类确实允许您访问相同的进程。让我们看一下以下示例:importosfrommultiprocessingimportProcess,current_processdefdoubler(number):"""Adoublingfunctionthatcanbeusedyaprocess"""result=number*2proc_name=current_process().nameprint('{0}doubledto{1}by:{2}'.format(number,result,proc_name))if__name__=='__main__':numbers=[5,10,15,20,25]procs=[]proc=Process(target=doubler,args=(5,))forindex,numberinenumerate(numbers):proc=Process(target=doubler,args=(number,))procs.append(proc)proc.start()proc=Process(target=doubler,name='Test',args=(2,))proc.start()procs.append(proc)forprocinprocs:proc.join()这一次,我们导入了更多的current_process。current_process基本上类似于threading模块的current_thread。我们使用它来获取调用我们函数的线程的名称。您会注意到我们没有为前五个进程设置名称。然后我们将第六个进程的名称设置为“测试”。让我们看看我们会得到什么样的输出:5doubledto10by:Process-210doubledto20by:Process-315doubledto30by:Process-420doubledto40by:Process-525doubledto50by:Process-62doubledto4by:Test输出说明:默认情况下,多处理模块给每个进程分配一个编号,并且该数字用于构成流程名称的一部分。当然,如果我们给它起一个名字,名字就不会加数字。锁多处理模块以与线程模块相同的方式支持锁。您需要做的就是导入Lock,获取它,做一些事情,然后释放它。frommultiprocessingimportProcess,Lockdefprinter(item,lock):"""Printsouttheitemthatwaspedin"""lock.acquire()try:print(item)finally:lock.release()if__name__=='__main__':lock=Lock()items=['tango','foxtrot',10]foriteminitems:p=Process(target=printer,args=(item,lock))p.start()我们在这里创建了一个简单的打印函数,你输入什么,它就输出什么。为了避免线程之间互相阻塞,我们使用Lock对象。代码循环遍历列表中的三个项目并为每个项目创建一个进程。每个进程都会调用我们的函数,每次遍历的item都会作为参数传递给函数。因为我们现在使用的是锁,所以队列中的下一个进程将阻塞,直到前一个进程释放锁。日志记录为进程创建日志与为线程创建日志略有不同。它们的不同之处在于Python的日志记录包不使用共享锁的进程,因此可能以来自不同进程的信息结束。让我们尝试在前面的示例中添加基本日志记录。代码如下:importloggingimportmultiprocessingfrommultiprocessingimportProcess,Lockdefprinter(item,lock):"""Printsouttheitemthatwaspassedin"""lock.acquire()try:print(item)finally:lock.release()if__name__=='__main__':lock=Lock()items=['tango','foxtrot',10]multiprocessing.log_to_stderr()logger=multiprocessing.get_logger()logger.setLevel(logging.INFO)foriteminitems:p=Process(target=printer,args=(item,lock))向p.start()添加日志记录的最简单方法是将其推送到stderr。我们可以通过调用log_to_stderr()函数来做到这一点。然后我们调用get_logger函数获取一个logger实例,并将其日志级别设置为INFO。之后的代码是一样的。需要提醒的是,我这里并没有调用join()方法。相反:当它退出时,父线程会自动调用join()方法。执行此操作时,您应该得到类似于以下的输出:[INFO/Process-1]childprocesscallingself.run()tango[INFO/Process-1]processshuttingdown[INFO/Process-1]processexitingwithexitcode0[INFO/Process-2]childprocesscallingself。run()[INFO/MainProcess]processshuttingdownfoxtrot[INFO/Process-2]processshuttingdown[INFO/Process-3]childprocesscallingself.run()[INFO/Process-2]processexitingwithexitcode010[INFO/MainProcess]callingjoin()forprocessProcess-3[信息/Process-3]processshuttingdown[INFO/Process-3]processexitingwithexitcode0[INFO/MainProcess]callingjoin()forprocessProcess-2现在如果你想将日志保存到磁盘,那么这个东西有点棘手的。您可以在Python的日志记录手册中阅读有关此主题的一些内容。Pool类Pool类用于表示工作进程池。它具有让您将任务卸载到工作进程的方法。让我们看下面一个非常简单的例子。frommultiprocessingimportPooldefdoubler(number):returnnumber*2if__name__=='__main__':numbers=[5,10,20]pool=Pool(processes=3)print(pool.map(doubler,numbers))基本上执行上面的代码后,一个创建了一个Pool实例,该实例创建了3个工作进程。然后我们使用map方法将一个函数和一个可迭代对象映射到每个进程。***我们打印出这个例子的结果:[10,20,40]。还可以通过apply_async方法获取池中进程的运行结果:frommultiprocessingimportPooldefdoubler(number):returnnumber*2if__name__=='__main__':pool=Pool(processes=3)result=pool.apply_async(doubler,(25,))print(result.get(timeout=1))我们上面所做的其实就是请求过程的运行结果。这就是get函数的用途。它试图得到我们的结果。你可以注意到我们设置了一个超时时间,这是为了防止我们调用的函数异常。毕竟我们不希望它被永久阻止。进程通信在进程间通信方面,多处理模块提供了两种主要方法:队列和管道。队列实现既是线程安全的又是进程安全的。让我们看一个相当简单的基于队列的例子。代码来自我的线程文章。frommultiprocessingimportProcess,Queuesentinel=-1defcreator(data,q):"""Createsdatatobeconsumedandwaitsfortheconsumertofinishprocessing"""print('Creatingdataandputtingitonthequeue')foritemindata:q.put(item)defmy_consumer(q):"""Consumesssomedataandworksonitinthiscase,allit=q.get()print('datafoundtobeprocessed:{}'.format(data))processed=data*2print(processed)ifdataissentinel:breakif__name__=='__main__':q=Queue()data=[5,10,13,-1]process_one=Process(target=creator,args=(data,q))process_two=Process(target=my_consumer,args=(q,))process_one.start()process_two.start()q.close()q.join_thread()process_one.get()方法,我们可以向Queue中添加数据,也可以从Queue中获取数据。最后一段代码只是创建了Queue对象和两个Process对象并运行它们。你可以注意到我们调用了t进程对象上的join()方法,而不是队列本身。总结我们这里有很多材料。您已经学习了如何使用多处理模块来指定不可变函数、使用队列在进程之间进行通信、为进程命名等等。Python文档中也有很多知识点本文没有接触到,所以一定要深入理解文档。同时,您现在知道如何使用Python来利用计算机的所有处理能力了!相关阅读Python多处理模块(multiprocessingmodule)文档PythonModuleWeekly:MultiprocessingConcurrencyinPython–PortingaQueuetomultiprocessing
