第十七章使用future处理并发第十七章使用future处理并发--/TOC-->futures模块concurrent.futures模块由Python3引入。concurrent.futures是python3新增的库,用于并发处理。提供类似于其他语言线程池的多线程、多进程并发功能(也有进程池)。属于上层封装。对于用户来说,你不需要考虑那么多事情。Concurrent提供了两种并发模型,一种是多线程的ThreadPoolExecutor,一种是多进程的ProcessPoolExecutor。对于IO密集型任务,应该使用多线程模型。对于计算密集型任务,应使用多处理模型。多线程模式ThreadPoolExecutor多线程模式适用于IO密集型操作,这里我想用sleep来模拟慢速IO任务这里我使用Googlefire开源库简化命令行参数处理importtimeimportfireimportthreadingfromconcurrent.futuresimportThreadPoolExecutor,wait#拆分子任务defeach_task(index):time.sleep(1)#休眠1s,模拟IOprint("thread%dsquare%d"%(threading.current_thread().ident,index))returnindex*index#返回结果defrun(thread_num,task_num):#实例化线程池,thread_numthreadexecutor=ThreadPoolExecutor(thread_num)start=time.time()fs=[]#futurelistforiinrange(task_num):fs.append(executor.submit(each_task,i))#提交任务wait(fs)#等待计算结束end=time.time()duration=end-starts=sum([f.result()forfinfs])#seekAndprint("totalresult=%dcost:%s"%(s,duration))executor.shutdown()#销毁线程池if__name__=='__main__':fire.Fire(run)executionreturnresult:python1.py210#即2个线程运行10个任务thread5216square0thread5856square1thread5216square2thread5856square3thread5216square4thread5856square5thread5216square6thread5856square7thread5856square9thread5216square8totalresult=285cost:5.06045389175415为什么输出乱了?这是因为打印操作不是原子的。它由两个连续的写操作组成。write输出内容,第二次write输出一个换行符。写操作本身是原子的,但是在多线程环境下,两次写操作会交错,所以输出不整齐。如果稍微修改一下代码,print改成单写操作,输出会很整齐---发现还是不行。调整参数看效果:python1.py1010thread4792平方9thread10380平方6thread14420平方7thread5720平方3thread13808平方4thread17344平方1thread9172平方5thread10684平方8thread11696平方0thread6300平方2totalresult=2805cost:1.05可以看到所有任务都是在1s内完成的。这就是多线程的魅力所在,可以将多个IO操作并行化,减少整体的处理时间。与多线程相比,多进程模式下的ProcessPoolExecutor适合处理IO密集型任务,多进程适合计算密集型任务。importosimportsysimportmathimporttimeimportfirefromconcurrent.futuresimportProcessPoolExecutor,wait#拆分子任务defeach_task(n):#根据公式计算pis=0.0foriinrange(n):s+=1.0/(i+1)/(i+1)pi=math.sqrt(6*s)#os.getpid可以得到子进程号sys.stdout.write("process%sn=%dpi=%s\n"%(os.getpid(),n,pi))returnpidefrun(process_num,*ns):#输入多个n值,分成多个子任务计算结果#实例化进程池,process_numprocessexecutor=ProcessPoolExecutor(process_num)start=time.time()fs=[]#futurelistforninns:fs.append(executor.submit(each_task,int(n)))#提交任务wait(fs)#等待计算结束end=time.time()duration=end-startprint("totalcost:%.2f"%duration)executor.shutdown()#销毁进程池if__name__=='__main__':fire.Fire(run)executionreturnresult:python1.py15000000500100050020005003000进程10024n=5000000pi=3.141592462603821进程10024n=5001000pi=3.141592462641988进程10024n=500=2014pi625544process10024n=5003000pi=3.141592462718321totalcost:4.62增大一个进程看看效果:python1.py25000000500100050020005003000process5860n=5000000pi=3.141592462603821process14948n=5001000pi=3.141592462641988process14948n=5003000pi=3.141592462718321process5860n=5002000pi=3.1415924626801544totalcost:2.78Intermsoftimeconsumption,ithasbeenshortenedbynearlyhalf,indicatingthatmulti-processhasindeedachievedtheeffectofcomputingparallelization.internalprinciple
