当前位置: 首页 > 科技观察

Python一行代码完成并行任务

时间:2023-03-14 15:50:07 科技观察

众所周知,Python的并行处理能力远非理想。我认为如果不考虑threading和GIL的标准参数(大多是合法的),原因不是技术不到位,而是我们没有用好。大多数关于Python线程和多处理的教科书都很棒,但乏味且冗长。他们确实在一开始就列出了很多有用的信息,但通常他们没有触及真正改善您日常工作的部分。以“Python线程教程(Python线程教程)”为关键词的经典示例DDG上的热门搜索结果显示:几乎文中给出的每个示例都是同一个类+队列。实际上,它们就是下面这一段使用producer/Consumer来处理线程/多进程的代码示例:#Example.py'''StandardProducer/ConsumerThreadingPattern'''importtimeimportthreadingimportQueueclassConsumer(threading.Threading__init):selfdef__.Thread.__init__(self)self._queue=queuedefrun(self):whileTrue:#queue.get()blocksthecurrentthreaduntil#anitemisretrieved.msg=self._queue.get()#Checksifthecurrentmessageis#the"PoisonPill"ifisinstance(msg,str)andmsg=='quit':#ifso,existstheloopbreak#"Processes"(orinourcase,prints)thequeueitemprint"I'mathread,andIreceived%s!!"%msg#Alwaysbefriendly!print'Byebyes!'defProducer():#Queueisusedtoshareitemsbetween#thethreads信息.queue=Queue.Queue()#Createaninstanceoftheworkerworker=Consumer(queue)#startcallstheinternalrun()方法#kickoffthethreadworker.start()#variabletokeeptrackofwhenwestartedstart_time=time.time()#Whileunder5seconds..whiletime.time()-start_time<5:#"制作“一件作品并贴在#thequeuefortheConsumertoprocessqueue.put('somethingat%s'%time.time())#Sleepabitjusttoavoidanabsurdnumberofmessagestime.sleep(1)#Thisthe"poisonpill"methodofkillingthread.queue.put('quit')#waitforthethreadtoclosedownworker.join()if__name__=='__main__'复制代码:Producer()嗯....感觉有点像Java。我不想说用Producer/Consume来解决threading/multiprocessing是错误的——因为它绝对是对的,而且在很多情况下是最好的方式。但我不认为这通常是编写代码的最佳选择。它的问题(个人意见)首先,您需要创建一个样板床上用品类。然后,您创建一个队列,通过它传递对象并监督队列的两端完成任务。(如果要实现数据的交换或存储,通常会涉及到另一个队列的参与)。工人越多,问题越多。接下来,您应该创建一个工作类池以提高Python的速度。下面是IBM教程给出的比较好的方法。这也是程序员在使用多线程检索网页时常用的方法。#Example2.py"""Amorerealisticthreadpoolexample"""importtimeimportthreadingimportQueueimporturllib2classConsumer(threading.Thread):def__init__(self,queue):threading.Thread.__init__(self)self._queue=queuedefrun(self):whileTrue:content=self._queue。get()ifisinstance(content,str)andcontent=="quit":breakresponse=urllib2.urlopen(content)print"再见!"defProducer():urls=["http://www.python.org','http://www.yahoo.com","http://www.scala.org','http://www.google.com",#etc..]queue=Queue.Queue()worker_threads=build_worker_pool(queue,4)start_time=time.time()#Addtheurlstoprocessforurlinurls:queue.put(url)#Addthepoisonpillvforworkerinworker_threads:queue.put("quit")forworkerinworker_threads:worker.join()print"Done!Timetaken:{}".format(time.time()-start_time)defbuild_worker_pool(queue,size):workers=[]for_inrange(size):worker=Consumer(队列)worker.start()workers.append(worker)returnworkersif__name__=='__main__':Producer()确实可以,但是这段代码多复杂呀!它包括初始化方法、线程跟踪列表,以及像我这样容易死锁的人的噩梦——大量的连接语句,而这仅仅是个开始!到目前为止我们取得了什么成就?基本上没什么。上面的代码几乎总是在做传递。这是一个很基础的方法,非常容易出错(妈的,刚才忘记调用队列对象的task_done()方法了(不过懒得修改了)),性价比很低.幸运的是,我们有更好的方法。简介:MapMap是一个很棒的小特性,也是快速运行Python并行代码的关键。对于那些不熟悉它的人来说,map来自函数式语言Lisp。映射函数可以依次映射出另一个函数。比如urls=['http://www.yahoo.com','http://www.reddit.com']results=map(urllib2.urlopen,urls)这里调用urlopen方法返回所有的调用结果排序并将其存储在列表中。像:results=[]forurlinurls:results.append(urllib2.urlopen(url))Map按顺序处理这些迭代。调用此函数,它将返回一个简单的列表,其中按顺序存储结果。为什么这么厉害?因为只要你有合适的库,map就可以让并行运行的非常流畅!有两个库可以通过map函数支持并行:一个是multiprocessing,另一个鲜为人知但功能强大。子文件:multiprocessing.dummy。题外话:这是什么?您从未听说过虚拟多处理库?我也是最近才。它仅在多进程文档中提到。而那句话就是让你知道,有这么一个东西。敢说,这种近乎抛售的做法,后果不堪设想!dummy是多进程模块的克隆文件。唯一的区别是multiprocessing模块使用进程,而dummy使用线程(当然有所有常见的Python限制)。也就是说,数据从一个传递到另一个。这使得数据可以轻松地在两者之间来回跳转,对于探索性程序特别有用,因为你不必确定帧调用是IO模式还是CPU模式。要准备好通过map函数实现并行,首先要导入模块:frommultiprocessingimportPoolfrommultiprocessing.dummyimportPoolasThreadPool然后初始化:pool=ThreadPool()这一句简单的语句就可以代替我们example2.py中的build_worker_pool函数的所有工作。换句话说,它创建了一些有效的工人,启动他们为下一份工作做准备,并将他们存放在不同的位置以方便使用。Pool对象有几个参数,但最重要的一个:进程。它决定了池中的工人数量。如果你不填,它会默认你电脑的核心值。如果您在CPU模式下使用多进程池,通常内核数量越多速度越快(还有许多其他因素)。但是,在进行线程处理或处理网络绑定工作时,情况会更加复杂,应该使用池的确切大小。pool=ThreadPool(4)#Setsthepoolsizeto4如果运行的线程太多,在多个线程之间切换会浪费很多时间,所以最好耐心调试最合适的任务数。我们现在已经创建了pool对象,我们很快就会有简单的并行程序,所以让我们重写example2.py中的urlopener!importurllib2frommultiprocessing.dummyimportPoolasThreadPoolurls=['http://www.python.org','http://www.python.org/about/','http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html','http://www.python.org/doc/','http://www.python.org/download/','http://www.python.org/getit/','http://www.python.org/community/','https://wiki.python.org/moin/','http://planet.python.org/','https//wiki.python.org/moin/LocalUserGroups','http://www.python.org/psf/','http://docs.python.org/devguide/','http://www.python.org/community/awards/'#etc..]#MakethePoolofworkerspool=ThreadPool(4)#Opentheurlsintheirownthreads#andreturntheresultsresults=pool.map(urllib2.urlopen,urls)#closethepoolandwaitfortheworktofinishpool.close()pool.join()看!这次代码只用了4行就完成了所有的工作。其中有3句还是简单固定的。调用map完成了我们前面示例的40行!为了更形象地展示这两种方法的区别,我还分别对它们的执行时间进行了计时。结果:还不错!并且还说明了为什么要仔细调整池的大小。这里只要大于9就可以让它跑得更快。示例2:生成数千张缩略图让我们在CPU模式下完成吧!我在工作中经常需要处理大量的图像文件夹。它的任务之一是创建缩略图。这在并行任务中已经是非常成熟的方法了。基本单线程创建importosimportPILfrommultiprocessingimportPoolfromPILimportImageSIZE=(75,75)SAVE_DIRECTORY='thumbs'defget_image_paths(folder):return(os.path.join(folder,f)forfinos.listdir(folder)if'jpeg'inf)defcreate_thumbnail(:im=Image.open(文件名)im.thumbnail(SIZE,Image.ANTIALIAS)base,fname=os.path.split(filename)save_path=os.path.join(base,SAVE_DIRECTORY,fname)im.save(save_path)if__name__=='__main__':folder=os.path.abspath('11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')os.mkdir(os.path.join(folder,SAVE_DIRECTORY))举个例子images=get_image_paths(folder_DIRECTORY)举个例子:我,但本质上它是将一个文件夹传递给程序,抓取其中的所有图片,并最终在各自的目录中创建和存储缩略图。我的计算机用了27.9秒来处理大约6000张图像。如果我们并行调用map而不是for循环:importosimportPILfrommultiprocessingimportPoolfromPILimportImageSIZE=(75,75)SAVE_DIRECTORY='thumbs'defget_image_paths(folder):return(os.path.join(folder,f)forfinos.listdir(folder)if'jpeg'inf)defcreate_thumbnail(文件名):im=Image.open(文件名)im.thumbnail(SIZE,Image.ANTIALIAS)base,fname=os.path.split(filename)save_path=os.path.join(base,SAVE_DIRECTORY,fname)im.save(save_path)if__name__=='__main__':folder=os.path.abspath('11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')os.mkdir(os.path.join(folder,SAVE_DIRECTORY))images=get_polder_Pool()pool.map(create_thumbnail,images)pool.close()pool.join()5.6秒!对于仅更改几行代码,这是一个巨大的加速。这种方法也可以更快,只要你分别在它们的进程和线程中运行cpu和io任务——但它经常会导致死锁。总之,考虑到map的实用功能以及没有人为的线程管理,我认为这是一种美观、可靠且易于调试的方法。好了,文章到此结束。在一行中完成并行任务。

猜你喜欢