当前位置: 首页 > 科技观察

Python并发编程线程池-进程池

时间:2023-03-14 13:39:41 科技观察

介绍Python标准库为我们提供了threading和multiprocessing模块来编写相应的多线程/多进程代码,但是当项目达到一定规模时,频繁创建/销毁进程或者线程是很耗资源的。这时候我们就得自己写线程池/进程池来以空间换时间。但是从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,直接支持编写线程池/进程池。Executor和Futureconcurrent.futures模块是基于Executor的,Executor是一个抽象类,不能直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor就非常有用了。顾名思义,它们分别用于创建线程池和进程池的代码。我们可以将相应的任务直接放入线程池/进程池中。无需维护Queue,不用担心死锁。线程池/进程池会自动为我们调度。Future的概念相信有java和nodejs下编程经验的朋友都不陌生。你可以理解为一个在未来完成的操作。这是异步编程的基础。在传统的编程模式下,比如我们操作queue.get时,在等待返回结果之前会阻塞,cpu无法释放出来做其他事情,Future的引入帮助我们在等待期间完成其他操作.关于Python中的异步IO,可以在看完这篇文章后参考我的《Python并发编程协程/异步IO》。p.s:如果你还是坚持使用Python2.x,请先安装futures模块。pipinstallfutures使用submit操作线程池/进程池。我们先通过下面的代码来理解线程池的概念#example1.pyfromconcurrent.futuresimportThreadPoolExecutorimporttimedefreturn_future_result(message):time.sleep(2)returnmessagepool=ThreadPoolExecutor(max_workers=2)#create一个可以容纳2个任务的线程池future1=pool。submit(return_future_result,("hello"))#给线程添加任务poolfuture2=pool.submit(return_future_result,("world"))#给线程添加任务print(future1.done())到池中#判断task1是否结束time.sleep(3)print(future2.done())#判断task2是否结束print(future1.result())#查看task1返回的结果print(future2.result())#查看task2返回的结果我们根据运行结果来分析一下。我们使用submit方法向线程池中添加任务,submit返回一个Future对象,可以简单理解为未来完成的操作。在第一个print语句中,很明显我们的future1因为time.sleep(2)没有完成,因为我们使用time.sleep(3)挂起主线程,所以到了第二个print语句的时候,我们的thread池中的所有任务都已完成。ziwenxie::~?pythonexample1.pyFalseTruehelloworld#在上面程序执行过程中,我们通过ps命令可以看到三个线程同时在后台运行pyziwenxie8361755783620319:45pts/000:00:00pythonexample1.pyziwenxie8361755783630319:40:40:pts/00:00pythonexample1.py上面的代码也可以改写成进程池的形式。api和线程池完全一样,不再赘述。#example2.pyfromconcurrent.futuresimportProcessPoolExecutorimporttimedefreturn_future_result(message):time.sleep(2)returnmessagepool=ProcessPoolExecutor(max_workers=2)future1=pool.submit(return_future_result,("hello"))future2=pool.submit("))打印(future1.done())time.sleep(3)print(future2.done())print(future1.result())print(future2.result())下面是运行结果ziwenxie::~?pythonexample2.pyFalseTruehelloworldziwenxie::~?ps-eLf|greppythonziwenxie8560755785603319:53pts/000:00:00pythonexample2.pyziwenxie8560755785630319:53pts/000:00:00pythonexample2.pyziwenxie8560755785640319:53pts/000:00:00pythonexample2.pyziwenxie8561856085610119:53pts/000:00:00pythonexample2.pyziwenxie8562856085620119:53pts/000:00:00pythonexample2.py使用map/wait来操作线程池/进程池除了提交,Exectuor还为我们提供了一个map方法,和内置的map用法类似,下面对比两个例子看在两者之间的区别。使用提交操作查看#example3.pyimportconcurrent.futuresimporturllib.requestURLS=['http://httpbin.org','http://example.com/','https://api.github.com/']defload_url(url,timeout):withurllib.request.urlopen(url,timeouttimeout=timeout)asconn:returnconn.read()#Wecanuseawithstatementtoensurethreadsarecleaneduppromptlywithconcurrent.futures.ThreadPoolExecutor(max_workers=3)asexecutor:#StarttheloadoperationsandmarkeachfuturewithsURLfuture_to_url,6mit{urlfutureforLS_to_url,6mit{urlfutureforLS}:futures.as_completed(future_to_url):url=future_to_url[future]try:data=future.result()exceptExceptionaseexc:print('%rgeneratedanexception:%s'%(url,exc))else:print('%rpageis%dbytes'%(url,len(data)))从运行结果可以看出,as_completed并不是按照URLS列表元素的顺序返回的。ziwenxie::~?pythonexample3.py'http://example.com/'pageis1270byte'https://api.github.com/'pageis2039bytes'http://httpbin.org'pageis12150bytes使用map#example4.pyimportconcurrent.futuresimporturllib.requestURLS=['http://httpbin.org','http://example.com/','https://api.github.com/']defload_url(url):withurllib.request.urlopen(url,timeout=60)asconn:returnconn.read()#Wecanuseawithstatementtoensurethreadsarecleaneduppromptlywithconcurrent.futures.ThreadPoolExecutor(max_workers=3)asexecutor:forurl,datainzip(URLS,executor.map(load_url,URLS)):print('%rpageis%dbytes'%(url,len(data)))从运行结果可以看出,map是按照URLS列表元素的顺序返回的,写的代码比较简洁直观。我们可以根据具体需要选择一种。ziwenxie::~?pythonexample4.py'http://httpbin.org'pageis12150bytes'http://example.com/'pageis1270bytes'https://api.github.com/'pageis2039bytes第三个选项是waitwait方法。返回一个元组(tuple),其中包含两个集合(collections),一个是完成的(completed),一个是未完成的(unfinished)。使用等待方法的一个好处是可以获得更多的自由度。它接收三个参数FIRST_COMPLETED、FIRST_EXCEPTION和ALL_COMPLETE,默认设置为ALL_COMPLETED。我们通过下面的例子来看看三个参数的区别fromconcurrent.futuresimportThreadPoolExecutor,wait,as_completedfromtimeimportsleepfromrandomimportrandintdefreturn_after_random_secs(num):sleep(randint(1,5))return"Returnof{}".format(num)pool=ThreadPoolExecutor(5)futures=[]forxinrange(5):futures.append(pool.submit(return_after_random_secs,x))print(wait(futures))#print(wait(futures,timeout=None,return_when='FIRST_COMPLETED'))如果默认ALL_COMPLETED,程序会阻塞直到线程池中的所有任务都完成。ziwenxie::~?pythonexample5.pyDoneAndNotDoneFutures(done={,,,,},not_done=set())如果使用FIRST_COMPLETED参数,程序不会等到线程池中的所有任务都完成。ziwenxie::~?pythonexample5.pyDoneAndNotDoneFutures(done={,,},not_done={,})思考题写一个小程序对比了multiprocessing.pool(ThreadPool)和ProcessPollExecutor(ThreadPoolExecutor)在执行效率上的差异,思考为什么会出现这样的结果,是结合上面提到的Future。【责任编辑:求职者TEL:(010)68476606】

猜你喜欢