代码环境:python3.6进程是程序执行的一个实例,是操作系统进行资源分配和调度的独立单位。线程是进程中的一个实体,是最小的执行单元。一个进程可以有多个线程。在资源使用方面,进程比线程大。现代操作系统支持“多任务”,我们通过多进程或多线程来实现多任务相关的编程。多进程的一个简单例子常用的多进程编程可以通过支持跨平台的multiprocessing模块来实现。frommultiprocessingimportProcessfromosimportgetpid#子进程要执行的代码defrun_proc(name):print(f'{name}childprocess{getpid()}isrunning...')defprocess_main():print(f'Currentprocess(mainprocess)id:{getpid()}')#创建子进程,传入执行的任务函数和对应的参数ancestorp1=Process(target=run_proc,args=('p1',))p2=Process(target=run_proc,args=('p2',))print('Thenthechildprocessstartsrunning...')#启动进程p1.start()p2.start()#Block主进程并告诉主进程:当我的子进程运行完毕,你可以继续执行p1.join()#p2.join()#exitcode:0表示进程已经终止,非0表示进程已经结束尚未终止print(f'p1'sprocessexitcode:{p1.exitcode}')print(f'p2'sprocessexitcode:{p2.exitcode}')if__name__=='__main__':process_main()print('Themainprocessends.')上面代码中,子进程p的exitcode2是None,因为p2没有调用join方法,主进程在进程终止前就结束了。使用时要特别注意。进程池如果要启动大量的子进程,我们可以使用进程池multiprocessing.Pool,它的优点是:方便控制并发度;它减少了资源消耗并自动回收可重复使用的资源。我们把上面的例子稍微修改一下,用进程池来实现:frommultiprocessingimportPoolfromosimportgetpidimporttime,random#子进程要执行的代码defrun_proc(name):print(f'{name}childprocess{getpid()}Running...')start=time.time()time.sleep(random.random()*3)print(f'{name}子进程运行{time.time()-开始:.2f}seconds...')defprocess_main():print(f'Currentprocess(mainprocess)id:{getpid()}')#创建进程池,我们限制最多执行3个并发进程po=Pool(3)fornamein['p1','p2','p3','p4','p5']:#传入执行的任务函数和对应的参数元组,开始并发执行po.apply_async(run_proc,(name,))po.close()#等待所有子进程执行完毕,必须放在close之后po.join()print('Allsub-processesareover...')if__name__=='__main__':process_main()betweenprocessesCommunication-Queue在单机上,thpython进程之间常用的通信方式是消息队列multiprocessing.Queue。我们创建两个进程,一个向Queue写入数据,一个从Queue读取数据:frommultiprocessingimportProcess,Queuefromosimportgetpidimporttime,random#writedataprocessdefprocess_write(q):print(f'writeprocess:{getpid()}')forvaluein['A','B','C']:print(f'queuequeuewritedata:{value}...')q.put(value)time.sleep(random.random())#读取数据过程defprocess_read(q):print(f'读取过程:{getpid()}')whileTrue:value=q.get()print(f'从队列中获取数据:{value}...')defprocess_main():print(f'当前进程(主进程)id:{getpid()}')#主进程创建Queueq=Queue()p_w=Process(target=process_write,args=(q,))p_r=Process(target=process_read,args=(q,))print('然后子进程开始运行...')p_w.start()p_r.start()p_w.join()#读进程while循环没有退出条件,这里读进程被强制终止pool使用进程池Pool创建的进程间通信,使用上面的Queue会报错,需要在multiprocessing.Manager实例中使用Queue:注意:multiprocessing.Manager()创建了一个manager对象SyncManager,可以共享进程池或跨机器进程之间的数据使用它。frommultiprocessingimportPool,Managerfromosimportgetpidimporttime,random#writedataprocessdefprocess_write(q):print(f'writeprocess:{getpid()}')forvaluein['A','B','复制代码C']:print(f'queuewritedata:{value}...'))q.put(value)time.sleep(random.random())#读数据过程defprocess_read(q):print(f'读取进程:{getpid()}')whilenotq.empty():value=q.get()print(f'从队列中获取数据:{value}...')defprocess_main():print(f'当前进程(主进程)id:{getpid()}')po=Pool(2)withManager()asmanager:q=manager.Queue()#阻塞主进程,相当于顺序执行,写入After进程执行完毕返回,开始读取进程po.apply(process_write,(q,))po.apply(process_read,(q,))po.close()po.join()if__name__=='__main__':process_main()print('主进程结束。')在上面使用Queue的例子中,我们可能会有这样的疑问:为什么我们不能加一个判断来终止循环when从队列中获取数据?进程池中的apply_async更适合并发,为什么要用apply?Queue有几个方法empty/qsize/full来判断队列的状态,但是在python官方文档中也提到这些方法在多进程或者多线程的复杂状态下“不可靠”。在上面的例子中,如果我们加上终止读进程循环的判断,在并发状态下,写进程time.sleep()一段时间,在下一次数据写入之前,读进程可能认为没有队列中的数据,直接退出读取过程。所以,为了妥协这种情况,我们只能先让写进程运行完,再让读进程读取数据。工作还需要根据实际的业务逻辑进行分析。进程间共享内存python中默认的进程间内存隔离,但是在某些情况下我们还是希望在进程之间共享状态和数据,这就需要共享内存。在进程池中,multiprocessing.Manager中的Value/Array/dict这三种对象类型是常用的进程间共享内存。首先看看这三个对象是如何创建的:Value(typecode,value):入参需要指定数据类型对应的字符串标识typecode,并赋值;Array(typecode,sequence):数组与列表非常相似。主要需要注意的是,列表中可以有多种数据类型,而数组中只能有一种类型,由第一个入参指定,所以数组比列表更节省内存;dict(iterable,**kwargs):是python内置的Dict类型。常用typecode标识(字符串)对照表:点击查看官方解释例如:frommultiprocessingimportPool,Managerdefworker(i,lock,normal_value,shared_value,shared_array,shared_dict):normal_value+=1withlock:shared_value.value+=1shared_array[0]+=1shared_dict['count']+=1print(f'subprocess[{i}]--normal:{normal_value},shared_v:{shared_value},shared_a:{shared_array},shared_d:{shared_dict}')defprocess_main():po=Pool(2)withManager()asmanager:lock=manager.Lock()normal_value=0shared_value=manager.Value('i',0)shared_array=manager。Array('i',(0,1,2))shared_dict=manager.dict({'count':0})foriinrange(5):po.apply_async(worker,(i,lock,normal_value,shared_value,shared_array,shared_dict))po.close()po.join()if__name__=='__main__':process_main()print('主进程结束。')上面例子的worker函数中,对normal_value的操作不需要加锁,因为进程间默认内存是隔离的,所以每个进程都会得到1;但是共享内存的操作需要自己加锁,否则得不到想要的结果。总结使用多进程时,记得使用join方法阻塞主进程;优先使用进程池,减少资源消耗;进程间通信,使用multiprocessing.Process模块??下的multiprocessing.Queue,使用multiprocessing.Pool模块下的multiprocessing.Manager实例中的Queue;multiprocessing.Manager()创建一个管理器对象SyncManager,在跨机器的进程之间的进程池或数据共享;进程间共享内存,经常使用进程池中multiprocessing.Manager实例的Value/Array/dict这三种对象类型。操作共享内存数据时要小心把自己锁起来。
