当前位置: 首页 > 科技观察

Python代码方便并行,这个操作秀!

时间:2023-03-13 22:29:33 科技观察

Python在程序并行化方面有些臭名昭著。撇开线程实现和GIL等技术问题不谈,我认为错误的指令是主要问题。常见的经典Python多线程、多进程教程显得“厚重”。而且它通常只是浅尝辄止,而没有深入研究在您的日常工作中最有用的东西。传统例子简单搜索《Python多线程教程》不难发现,几乎所有教程都给出了涉及类和队列的例子:.path.join(folder,f)forfinos.listdir(folder)if'jpeg'inf)defcreate_thumbnail(filename):im=Image.open(filename)im.thumbnail(SIZE,Image.ANTIALIAS)base,fname=os.path.split(filename)save_path=os.path.join(base,SAVE_DIRECTORY,fname)im.save(save_path)if__name__=='__main__':folder=os.path.abspath('11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')osm(osm.path.join(folder,SAVE_DIRECTORY))images=get_image_paths(folder)pool=Pool()pool.map(creat_thumbnail,images)pool.close()pool.join()哈,有点像Java吧?我并不是说对多线程/多进程任务使用生产者/消费者模型是错误的(事实上,这种模型有其用武之地)。只是我们在处理日常脚本编写任务时可以使用更高效的模型。问题是……首先,你需要一个样板类;其次,你需要一个队列来传递对象;并且,你需要在通道的两端构建方法来帮助它工作(如果你想要双向通信或者保存结果,还需要引入另一个队列)。工人越多,问题越多。根据这个想法,你现在需要一个工作线程的线程池。这是来自经典IBM教程的示例-通过多线程加速Web检索。#Example2.py'''Amorerealisticthreadpoolexample'''importtimeimportthreadingimportQueueimporturllib2classConsumer(threading.Thread):def__init__(self,queue):threading.Thread.__init__(self)self._queue=queuedefrun(self):whileTrue:content=self._queue。get()ifisinstance(content,str)andcontent=='quit':breakresponse=urllib2.urlopen(content)print'Byebyes!'defProducer():urls=['http://www.python.org','http://www.yahoo.com''http://www.scala.org','http://www.google.com'#etc..]queue=Queue.Queue()worker_threads=build_worker_pool(queue,4)start_time=time.time()#Addtheurlstoprocessforurlinurls:queue.put(url)#Addthepoisonpillvforworkerinworker_threads:queue.put('quit')forworkerinworker_threads:worker.join()print'Done!Timetaken:{}'.format(time.time()-start_time)defbuild_worker_pool(queue,size):workers=[]for_inrange(size):worker=Consumer(queue)worker.start()workers.append(worker)返回工人if__name__=='__main__':Producer()这段代码运行正常,但是仔细看看我们需要做什么:构造不同的方法,跟踪一系列线程,以及为了解决烦人的死锁问题,我们需要进行一系列的join操作,而这仅仅是个开始……至此我们已经回顾了经典的多线程教程,是不是有些空洞?它是样板式的并且容易出错,所以事半功倍的风格显然不太适合日常使用。幸运的是,我们有更好的方法。为什么不试试mapmap,这是一个简洁的小函数,它是轻松并行化Python程序的关键。map源自Lisp等函数式编程语言。它可以通过一个序列实现两个函数之间的映射。urls=['http://www.yahoo.com','http://www.reddit.com']results=map(urllib2.urlopen,urls)上面两行代码将urls这个序列中的每个元素作为参数传递给urlopen方法,所有结果都存储在结果列表中。结果大致等价于:results=[]forurlinurls:results.append(urllib2.urlopen(url))map函数一手处理了序列操作、参数传递、结果保存等一系列操作。为什么这很重要?这是因为地图可以很容易地与正确的库并行化。Python中有两个库包含map函数:multiprocessing及其鲜为人知的子库multiprocessing.dummy。这里多说几句:multiprocessing.dummy?mltiprocessing库的线程克隆?这是虾吗?甚至在multiprocessing库的官方文档中,也只有一句话说到这个子库。而将这段描述翻译成成人语言,基本上就是:“好吧,有这么回事,你知道就行了。”相信我,这个库被严重低估了!dummy是multiprocessing模块的完整克隆,唯一的区别是multiprocessing在进程上工作,而dummy在线程上工作(因此包括Python所有常见的多线程限制)。所以替换和使用这两个库是非常容易的。您可以为IO-bound任务和CPU-bound任务选择不同的库。尝试使用以下两行代码来引用包含并行映射函数的库:frommultiprocessingimportPoolfrommultiprocessing.dummyimportPoolasThreadPool实例化Pool对象:pool=ThreadPool()这个简单的语句替换了example2.py中buildworkerpool函数中的7行代码的工作.它生成并初始化一系列工作线程,将它们存储在变量中以便于访问。Pool对象有一些参数,这里需要重点关注的只是它的第一个参数:processes。该参数用于设置线程池中的线程数。它的默认值是当前机器CPU的核心数。一般来说,在执行CPU密集型任务时,调用的内核越多,速度就越快。但是在处理网络密集型任务时,事情会变得有点不可预测,明智的做法是试验线程池的大小。pool=ThreadPool(4)#设置poolsize为4当线程过多时,切换线程所花费的时间甚至会超过实际工作时间。尝试为不同的作业找到最佳线程池大小是个好主意。创建Pool对象后,并行化程序就可以运行了。看一下改写后的example2.pyimporturllib2frommultiprocessing.dummyimportPoolasThreadPoolurls=['http://www.python.org','http://www.python.org/about/','http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html','http://www.python.org/doc/','http://www.python.org/download/','http://www.python.org/getit/','http://www.python.org/community/','https://wiki.python.org/moin/','http://planet.python.org/','https://wiki.python.org/moin/LocalUserGroups','http://www.python.org/psf/','http://docs.python.org/devguide/','http://www.python.org/community/awards/'#etc..]#MakethePoolofworkerspool=ThreadPool(4)#Opentheurlsintheirownthreads#andreturntheresultsresults=pool.map(urllib2.urlopen,urls)#closethepoolandwaitfortheworktofinishpool。close()pool.join()只有4行代码实际有效,其中只有一行是关键的。map函数可以轻松替换上面的40多行示例。为了更有趣,我统计了不同方法和不同线程池大小的耗时。#results=[]#forurlinurls:#result=urllib2.urlopen(url)#results.append(result)##------VERSUS--------###-------4Pool------##pool=ThreadPool(4)#results=pool.map(urllib2.urlopen,urls)##------8Pool------##pool=ThreadPool(8)#results=pool.map(urllib2.urlopen,urls)##-------13Pool------##pool=ThreadPool(13)#results=pool.map(urllib2.urlopen,urls)结果:#Singlethread:14.4Seconds#4Pool:3.1Seconds#8Pool:1.4Seconds#13Pool:1.3Seconds很棒的结果不是吗?这个结果也解释了为什么要通过实验确定线程池的大小。在我的机器上,当线程池大小大于9时,好处非常有限。另一个真实世界的例子生成数千张图像的缩略图是一项CPU密集型任务,非常适合并行化。基础单进程版本importosimportPILfrommultiprocessingimportPoolfromPILimportImageSIZE=(75,75)SAVE_DIRECTORY='thumbs'defget_image_paths(folder):return(os.path.join(folder,f)forfinos.listdir(folder)if'jpeg'inf)defcreate_thumbnail(filename)im=Image.open(文件名)im.thumbnail(SIZE,Image.ANTIALIAS)base,fname=os.path.split(filename)save_path=os.path.join(base,SAVE_DIRECTORY,fname)im.save(save_path)if__name__=='__main__':folder=os.path.abspath('11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')os.mkdir(os.path.join(folder,SAVE_DIRECTORY))images=get_image_paths(folder)forimagenimageintopside(这段代码主要工作是遍历incoming文件夹中的图像文件,一张一张生成缩略图,并将这些缩略图保存到特定的文件夹中。在我的机器上,这个程序需要27.9秒来处理6000张图像。如果我们使用map功能而不是for循环:'inf)defcreate_thumbnail(文件名):im=Image.open(文件名)im.thumbnail(SIZE,Image.ANTIALIAS)base,fname=os.path.split(filename)save_path=os.path.join(base,SAVE_DIRECTORY,fname)im.save(save_path)if__name__=='__main__':folder=os.path.abspath('11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')os.mkdir(os.path.join(folder,SAVE_DIRECTORY))images=get(folder_path))pool.map(creat_thumbnail,images)pool.close()pool.join()5.6秒!虽然我们只改动了几行代码,但是我们已经明显提高了程序的执行速度。在生产环境中,对于CPU密集型任务和IO密集型任务,我们可以选择多进程、多线程的库,进一步提高执行速度——这也是解决死锁问题的好方法。另外,由于map函数不支持手动线程管理,使得相关的调试工作变得异常简单。至此,我们已经(基本上)用一行Python实现了并行化。