当前位置: 首页 > 科技观察

使用Python实现多任务流程

时间:2023-03-17 10:37:58 科技观察

本文转载自微信公众号《杰哥的IT之旅》,作者阿拉斯加。转载本文请联系杰哥的IT之旅公众号。1、进程介绍进程:正在执行的程序由程序、数据和进程控制块组成。它是正在执行的程序。程序的一次性执行过程是资源调度的基本单位。程序:不执行的代码,是静态代码。2、线程和进程的比较从图中可以看出:此时计算机有9个应用进程,但是一个进程会对应多个线程。同时运行多个QQ线程:能够完成多个任务,一个QQ多个聊天窗口的根本区别:进程是操作系统资源分配的基本单位,线程是任务调度和执行的基本单位.使用多进程的优点:1.拥有独立的GIL:首先,由于进程中GIL的存在,Python中的多线程无法很好的发挥多核的优势。对于一个进程中的多个线程,同一时刻只能运行一个线程。对于多进程来说,每个进程都有自己的GIL,所以在多核处理器下,多进程的运行不会受到GIL的影响。因此,多进程更能发挥多核的优势。2.效率高当然,对于爬虫这样的IO密集型任务,多线程和多处理的影响并没有太大区别。对于计算密集型的任务,Python的多进程与多线程相比,多核运行的效率会提高一倍。三、Python实现多进程先举个例子感受一下:1、使用进程类importmultiprocessingdefprocess(index):print(f'Process:{index}')if__name__=='__main__':foriinrange(5):p=multiprocessing.Process(target=process,args=(i,))p.start()这是实现多进程最基本的方法:通过创建Process来创建一个新的子进程,其中target参数传入方法名,args是方法的参数,以元组的形式传入,与被调用方法过程的参数一一对应。注意:这里的args必须是元组。如果只有一个参数,则应在元组的第一个元素之后添加一个逗号。如果没有逗号,就和单个元素本身没什么区别,不能组成元组,导致传参。问题。创建进程后,我们可以通过调用start方法来启动进程。运行结果如下:Process:0Process:1Process:2Process:3Process:4可以看到,我们运行了5个子进程,每个进程都调用了process方法。process方法的index参数通过Process的args传入,分别是0到4的5个序号,最后打印出来,5个子流程就结束了。2.继承进程类multiprocessingimportProcessimporttimeclassMyProcess(Process):def__init__(self,loop):Process.__init__(self)self.loop=loopdefrun(self):forcountinrange(self.loop):time.sleep(1)print(f'Pid:{self.pid}LoopCount:{count}')if__name__=='__main__':foriinrange(2,5):p=MyProcess(i)p.start()我们首先声明一个构造方法,它接收一个loop参数表示循环次数,设置为全局变量。在run方法中,这个循环变量用于循环循环次数,打印当前进程号和循环次数。调用时,我们使用range方法获取2、3、4这三个数,分别初始化到MyProcess进程中,然后调用start方法启动进程。注意:这里流程的执行逻辑需要在run方法中实现。需要调用start方法启动进程,调用后会执行run方法运行结果如下:Pid:12976LoopCount:0Pid:15012LoopCount:0Pid:11976LoopCount:0Pid:12976LoopCount:1Pid:15012LoopCount:1Pid:11976LoopCount:1Pid:15012LoopCount:2Pid:11976LoopCount:2Pid:11976LoopCount:3注意,这里的进程pid代表进程号,不同机器不同时间运行结果可能不同。4.进程间通信1.Queue-QueueFIFOfrommultiprocessingimportQueueimportmultiprocessingdefdownload(p):#下载数据lst=[11,22,33,44]foriteminlst:p.put(item)print('数据下载成功...')defsavedata(p):lst=[]whileTrue:data=p.get()lst.append(data)ifp.empty():breakprint(lst)defmain():p1=Queue()t1=多处理。Process(target=download,args=(p1,))t2=multiprocessing.Process(target=savedata,args=(p1,))t1.start()t2.start()if__name__=='__main__':main()数据已下载成功....[11,22,33,44]2.共享全局变量不适合多进程编程importmultiprocessinga=1defdemo1():globalaa+=1defdemo2():print(a)defmain():t1=multiprocessing.Process(target=demo1)t2=multiprocessing.Process(target=demo2)t1.start()t2.start()if__name__=='__main__':main()运行结果:1有结果到知道:全局变量不共享;五、进程池之间的通信1、进程池的介绍当要创建的子进程数量不大的时候,可以直接使用multiprocessing中的Process动态生成多个进程,但是如果有几百甚至上千个目标,手动创建进程的工作量是巨大的。这时候就可以使用multiprocessing模块提供的Pool方法了。frommultiprocessingimportPoolimportos,time,randomdefworker(a):t_start=time.time()print('%s开始执行,进程号为%d'%(a,os.getpid()))time.sleep(random.random()*2)t_stop=time.time()print(a,"执行完成,耗时%0.2f"%(t_stop-t_start))if__name__=='__main__':po=Pool(3)#定义一个进程池foriinrange(0,10):po.apply_async(worker,(i,))#将worker任务添加到进程池中print("--start--")po.close()po.join()print("--end--")运行结果:--start--0开始执行,进程号66641开始执行,进程号47722开始执行,进程号132560执行完成,耗时0.183开始执行,进程号66642执行完成,耗时0.164开始执行,进程号132561,执行完毕,用0.675开始执行,进程号47724,执行完毕,用0.876开始执行,进程号132563,执行完毕完成,耗时1.597吨o开始执行,进程号为66645的执行完成,需要1.158开始,进程号47727,开始需要0.409,进程号66646,完成需要1.808,结束需要1.499完成,需要1.36--end--一个进程池只能容纳3个进程,执行完成后才能加入新的任务,在不断开启和释放的过程中不断重复循环。6、案例:文件批量复制操作思路:获取待复制文件夹名称新建文件夹获取文件夹内所有待复制文件名创建进程池在进程池中添加任务代码如下:导入packageimportmultiprocessingimportosimporttimeCustomizefilecopyFunctiondefcopy_file(Q,oldfolderName,newfolderName,file_name):#文件复制,不需要返回time.sleep(0.5)#print('\r%s文件从%s文件夹复制到%s文件夹'%(oldfolderName,newfolderName,file_name),end='')old_file=open(oldfolderName+'/'+file_name,'rb')#要复制的文件content=old_file.read()old_file.close()new_file=open(newfolderName+'/'+file_name,'wb')#复制新文件new_file.write(content)new_file.close()Q.put(file_name)#添加文件定义到Q队列??Mainfunctiondefmain():oldfolderName=input('请输入要复制的文件夹名称:')#Step1获取要复制的文件夹名称(可以手动创建也可以通过代码创建,这里我们手动创建)newfolderName=oldfolderName+'copy'#Step2创建新文件Folderifnotos.path.exists(newfolderName):os.mkdir(newfolderName)filenames=os.listdir(oldfolderName)#3.获取文件夹中要复制的所有文件名#print(filenames)pool=multiprocessing.Pool(5)#4。创建进程池Q=multiprocessing.Manager().Queue()#创建队列并通信forfile_nameinfilenames:pool.apply_async(copy_file,args=(Q,oldfolderName,newfolderName,file_name))#5.进步诚池添加任务po.close()copy_file_num=0file_count=len(filenames)#不知道什么时候完成,所以定义一个死循环whileTrue:file_name=Q.get()copy_file_num+=1time.sleep(0.2)print('\r复制进度%.2f%%'%(copy_file_num*100/file_count),end='')#制作复制进度条ifcopy_file_num>=file_count:中断程序运行if__name__=='__main__':main()运行结果如下如图:运行前后文件目录结构对比。以上内容是整体的一般结果。由于测试中有随便粘贴的测试文件,这里就不进行演示了。