Python在并行化程序方面有些臭名昭著。撇开线程实现和GIL等技术问题不谈,我认为错误的指令是主要问题。常见的经典Python多线程、多进程教程显得“厚重”。而且它通常只是浅尝辄止,而没有深入研究在您的日常工作中最有用的东西。传统的例子,只要搜索“Python多线程教程”,不难发现,几乎所有的教程都给出了涉及类和队列的例子:importosimportPILfrommultiprocessingimportPoolfromPILimportImageSIZE=(75,75)SAVE_DIRECTORY='thumbs'defget_image_paths(folder):return(os.path.join(folder,f)forfinos.listdir(folder)if'jpeg'inf)defcreate_thumbnail(filename):im=Image.open(文件名)im.thumbnail(SIZE,Image.ANTIALIAS)base,fname=os.path.split(filename)save_path=os.path.join(base,SAVE_DIRECTORY,fname)im.save(save_path)if__name__=='__main__':文件夹=os.path.abspath('11_18_2013_R000_IQM_BIG_SUR_SUR_SUR_MON__E10D1958E7B766C3E840')OS.MKDIR(OS.PATH.JOIN(os.join)Join(folder,save_directory)close()pool.join()哈,有点像Java是不是?我并不是说对多线程/多进程任务使用生产者/消费者模型是错误的(事实上,这种模型有其用武之地)。只是我们在处理日常脚本编写任务时可以使用更高效的模型。问题是……首先,你需要一个样板类;\其次,你需要一个队列来传递对象;\并且,你需要在通道的两端构建方法来帮助它工作(如果你想要双向通信或者需要引入另一个队列来保存结果)。工人越多,问题越多。根据这个想法,你现在需要一个工作线程的线程池。这是来自经典IBM教程的示例-通过多线程加速Web检索。#Example2.py'''一个更真实的线程池例子'''importtimeimportthreadingimportQueueimporturllib2classConsumer(threading.Thread):def__init__(self,queue):threading.Thread.__init__(self)self._queue=queuedefrun(self):whileTrue:content=self._queue.get()ifisinstance(content,str)andcontent=='quit':breakresponse='urllib2.urlopenbyes!'defProducer():urls=['http://www.python.org','http://www.yahoo.com''http://www.scala.org','http://www.google.com'#等。]queue=Queue.Queue()worker_threads=build_worker_pool(queue,4)start_time=time.time()#添加要处理的urlforurlinurls:queue.put(url)#为worker_threads中的worker添加毒药:queue.put('quit')一!所用时间:{}'.format(time.time()-start_time)defbuild_worker_pool(queue,size):workers=[]for_inrange(size):worker=Consumer(queue)worker.start()workers。append(worker)returnworkersif__name__=='__main__':Producer()这段代码可以正常工作,但是仔细看看我们需要做什么:构造不同的方法,跟踪一系列线程,并为解决恼人的死锁问题,我们需要进行一系列的join操作,而这仅仅是个开始……至此我们已经回顾了经典的多线程教程,是不是有些空洞?它是样板式的并且容易出错,所以事半功倍的风格显然不太适合日常使用。幸运的是,我们有更好的方法。为什么不试试mapmap,这是一个简洁的小函数,它是轻松并行化Python程序的关键。map源自Lisp等函数式编程语言。它可以通过一个序列实现两个函数之间的映射。urls=['http://www.yahoo.com','http://www.reddit.com']results=map(urllib2.urlopen,urls)上面两行代码将urls这个序列中的每个元素作为参数传递给urlopen方法,所有结果都存储在结果列表中。结果大致等同于:results=[]forurlinurls:results.append(urllib2.urlopen(url))map函数一手处理了序列操作、参数传递、结果保存等一系列操作。为什么这很重要?这是因为地图可以很容易地与正确的库并行化。Python中有两个库包含map函数:multiprocessing及其鲜为人知的子库multiprocessing.dummy。这里多说几句:multiprocessing.dummy?mltiprocessing库的线程克隆?这是虾吗?甚至在multiprocessing库的官方文档中,也只有一句话说到这个子库。而将这段描述翻译成成人语言,基本上就是:“好吧,有这么回事,你知道就行了。”相信我,这个库被严重低估了!dummy是multiprocessing模块的完整克隆,唯一的区别是multiprocessing在进程上工作,而dummy在线程上工作(因此包括Python所有常见的多线程限制)。\因此很容易交替使用这两个库。您可以为IO-bound任务和CPU-bound任务选择不同的库。尝试使用以下两行代码来引用包含并行映射函数的库:frommultiprocessingimportPoolfrommultiprocessing.dummyimportPoolasThreadPool实例化Pool对象:pool=ThreadPool()这个简单的语句替换了example2.py中的buildworkerpool函数只需7行代码即可完成这项工作。它生成并初始化一系列工作线程,将它们存储在变量中以便于访问。Pool对象有一些参数,这里需要重点关注的只是它的第一个参数:processes。该参数用于设置线程池中的线程数。它的默认值是当前机器CPU的核心数。一般来说,在执行CPU密集型任务时,调用的内核越多,速度就越快。但是在处理网络密集型任务时,事情会变得有点不可预测,明智的做法是试验线程池的大小。pool=ThreadPool(4)#设置poolsize为4当线程过多时,切换线程所花费的时间甚至会超过实际工作时间。尝试为不同的作业找到最佳线程池大小是个好主意。创建Pool对象后,并行化程序就可以运行了。我们来看看重写后的example2.pyimporturllib2frommultiprocessing.dummyimportPoolasThreadPoolurls=['http://www.python.org','http://www.python.org/about/','http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html','http://www.python.org/doc/','http://www.python.org/download/','http://www.python.org/getit/','http://www.python.org/community/','https://wiki.python.org/moin/','http://planet.python.org/','https://wiki.python.org/moin/LocalUserGroups','http://www.python.org/psf/','http://docs.python.org/devguide/','http://www.python.org/community/awards/'#etc..]#MakethePoolofworkerspool=ThreadPool(4)#打开自己的urlsthreads#并返回结果results=pool.map(urllib2.urlopen,urls)#closepool等待工作完成pool.close()pool.join()真正起作用的代码只有4行,只有一行其中很关键。map函数轻松替换了上面40多行的示例。为了更有趣,我统计了不同方法和不同线程池大小的耗时。#results=[]#forurlinurls:#result=urllib2.urlopen(url)#结果。append(result)##-------VERSUS-------###-------4Pool-------##pool=ThreadPool(4)#results=池。map(urllib2.urlopen,urls)##------8池-------##pool=ThreadPool(8)#results=pool.map(urllib2.urlopen,urls)##------13池------##pool=ThreadPool(13)#results=pool.map(urllib2.urlopen,urls)Results:#单线程:14.4秒#4池:3.1秒#8池:1.4秒#。这个结果也解释了为什么要通过实验确定线程池的大小。在我的机器上,当线程池大小大于9时,好处非常有限。另一个真实世界的例子生成数千张图像的缩略图\这是CPU密集型任务,非常适合并行化。基本单进程版本importosimportPILfrommultiprocessingimportPoolfromPILimportImageSIZE=(75,75)SAVE_DIRECTORY='thumbs'defget_image_paths(folder):return(os.path.join(folder,f)forfinos.listdir(folder)if'jpeg'inf)defcreate_thumbnail(filename):im=Image.open(filename)im.thumbnail(SIZE,Image.ANTIALIAS)base,fname=os.path.split(filename)save_path=os。))images=get_image_paths(folder)forimageinimages:create_thumbnail(Image)上面代码的主要工作是遍历传入文件夹中的图片文件,一张一张生成缩略图,并将这些缩略图保存到特定的文件夹中.在我的机器上,这个程序需要27.9秒来处理6000张图像。如果我们使用map函数而不是for循环:importosimportPILfrommultiprocessingimportPoolfromPILimportImageSIZE=(75,75)SAVE_DIRECTORY='thumbs'defget_image_paths(folder):return(os.path.join(folder,f)forfinos.listdir(folder)if'jpeg'inf)defcreate_thumbnail(filename):im=Image.open(filename)im.thumbnail(SIZE,Image.ANTIALIAS)base,fname=os.path.split(filename)??save_path=os.path.join(base,SAVE_DIRECTORY,fname)??im.save(save_path)if__name__=='__main__':??folder=os.path.abspath(????'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')??os.mkdir(os.path.join(folder,SAVE_DIRECTORY))images=get_image_paths(folder)pool=Pool()pool.map(creat_thumbnail,images)pool.close()pool.join()5.6秒!虽然我们只改动了几行代码,但是我们已经明显提高了程序的执行速度。在生产环境中,对于CPU密集型任务和IO密集型任务,我们可以选择多进程、多线程的库,进一步提高执行速度——这也是解决死锁问题的好方法。另外,由于map函数不支持手动线程管理,使得相关的调试工作变得异常简单。至此,我们已经(基本上)用一行Python实现了并行化。以上就是本次分享的全部内容。觉得文章还不错的话,请关注公众号:Python编程学习圈,每日干货分享,发送“J”还能领取大量学习资料。或者去编程学习网了解更多编程技术知识。
