当前位置: 首页 > 科技观察

Python分布式进程中你会遇到的陷阱

时间:2023-03-12 08:55:37 科技观察

做文章你是用Python3还是windows系统编程?最重要的是你对进程和线程不是很清楚?那么恭喜,在python分布式进程中,会有坑等着你去挖。..(哈哈哈哈,允许我在这里吓唬你)开个玩笑,但是如果你知道序列不支持匿名函数,那么这个坑就跟你说byebye了。好了,话不多说,直接进入正题。分布式进程众所周知,Process比Thread更稳定,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。Python的multiprocessing模块不仅支持多进程,managers子模块也支持将多个进程分布到多台机器上。服务进程可以充当调度程序,依靠网络通信将任务分发给其他进程。由于managers模块被很好的封装,在不知道网络通信细节的情况下,很容易编写分布式多进程程序。比如我们已经有一个多进程的程序,在同一台机器上运行,通过Queue进行通信,现在由于任务处理进程的工作量大,我们希望将发送任务的进程和处理任务的进程进行分布式处理到两台机器首先,这应该如何用分布式进程来实现?你已经知道原来的Queue可以继续使用,通过managers模块将Queue暴露在网络中,就可以让其他机器上的进程访问Queue。好吧,让我们开始吧!写一个task_master.py,我们先看服务流程。服务进程负责启动Queue,在网络上注册Queue,然后将任务写入Queue。#!/user/bin/python#-*-coding:utf-8-*-#@Time:2018/3/316:46#@Author:lichexo#@File:task_master.pyimportrandom,time,queuefrommultiprocessing.managersimportBaseManager#发送任务Queue:task_queue=queue.Queue()#接收结果的Queue:result_queue=queue.Queue()#QueueManager继承自BaseManager:classQueueManager(BaseManager):pass#注册两个Queue到网络,可调用参数关联Queue对象:QueueManager。register('get_task_queue',callable=lambda:task_queue)QueueManager.register('get_result_queue',callable=lambda:result_queue)#绑定5000端口,设置验证码'abc':manager=QueueManager(address=('',5000),authkey=b'abc')#启动Queue:manager.start()#获取通过网络访问的Queue对象:task=manager.get_task_queue()result=manager.get_result_queue()#把几个A任务进去:foriinrange(10):n=random.randint(0,10000)print('Puttask%d...'%n)task.put(n)#从结果队列中读取结果:print('Trygetresults...')foriinrange(10):r=result.get(timeout=10)print('Result:%s'%r)#Close:manager.shutdown()print('masterexit.')请注意,当我们在一台机器上写多进程程序时,创建的Queue可以可以直接使用。但是在分布式多进程环境下,向Queue中添加任务是不能直接对原来的task_queue进行操作的。这样绕过了QueueManager的封装,必须加上manager.get_task_queue()得到的Queue接口。然后,在另一台机器上启动任务进程(也可以在本机上启动)写task_worker.py#!/user/bin/python#-*-coding:utf-8-*-#@Time:2018/3/316:46#@Author:lichexo#@File:task_worker.pyimporttime,sys,queuefrommultiprocessing.managersimportBaseManager#创建类似QueueManager:classQueueManager(BaseManager):pass#由于这个QueueManager只是从网络上获取Queue,所以注册的时候只提供名称:QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')#连接到服务器,即isThemachinerunningtask_master.py:server_addr='127.0.0.1'print('Connecttoserver%s...'%server_addr)#端口和验证码要和task_master.py设置完全一致:m=QueueManager(address=(server_addr,5000),authkey=b'abc')#从网络连接:m.connect()#获取Queue的对象:task=m.get_task_queue()result=m.get_result_queue()#获取任务从任务队列,并将结果写入结果队列:foriinrange(10):try:n=task.get(timeout=1)print('runtask%d*%d...'%(n,n))r='%d*%d=%d'%(n,n,n*n)time.sleep(1)result.put(r)exceptQueue.Empty:print('taskqueueisempty.')#处理结束:print('workerexit.')任务进程需要通过网络连接服务进程,所以必须指定服务进程的IP.运行结果现在,是时候尝试运行分布式进程了。先启动task_master.py服务进程:Traceback(mostrecentcalllast):File"F:/Python/untitled/xianchengjincheng/master.py",line25,inmanager.start()File"F:Pythonpystalllibmultiprocessingmanagers.py",line513,instartself._process.start()File"F:Pythonpystalllibmultiprocessingprocess.py",line105,instartself._popen=self._Popen(self)File"F:Pythonpystalllibmultiprocessingcontext.py",line322,in_PopenreturnPopen(process_obj)File"F:Pythonwinpystalllibmultiprocessingn_Popen32py”,line65,in__init__reduction.dump(process_obj,to_child)文件“F:Pythonpystalllibmultiprocessingeduction.py”,line60,indumpForkingPickler(file,protocol).dump(obj)_pickle.PicklingError:Can'tpickleat0x00000202E8>:192attributelookupon__main__failedtask_master.py进程发送任务后,开始等待结果队列的结果。现在启动task_worker.py进程:Connecttoserver127.0.0.1...Traceback(mostrecentcallast):File"F:/Python/untitled/xianchengjincheng/work.py",line24,inm.connect()File"F:Pythonpystalllibmultiprocessingmanagers.py",line489,inconnectconn=Client(self._address,authkey=self._authkey)File"F:Pythonpystalllibmultiprocessingconnection.py",line487,inClientc=SocketClient(address)文件"F:Pythonpystalllibmultiprocessingconnection14.py",line6inSocketClients.connect(address)ConnectionRefusedError:[WinError10061]由于目标计算机主动拒绝,无法连接。有没有看到结果全错了,我们来分析一下是哪里出了问题。..错误分析在task_master.py的错误信息中,我们知道是说lambda错误。这是因为序列化不支持匿名函数,所以我们只好修改代码,用QueueManager重新封装队列放到网络上。#将两个Queue注册到网络中,callable参数关联Queue对象QueueManager.register('get_task_queue',callable=return_task_queue)QueueManager.register('get_result_queue',callable=return_result_queue)其中task_queue和result_queue是两个队列,Store任务和结果分开。它们用于进程间通信,交换对象。因为是分布式环境,放入队列的数据需要等待Workers机器计算处理后才能读取。这样,队列就需要用QueueManager进行封装,放到网络上。这是通过上面2行代码实现的。我们将return_task_queue的网络调用接口命名为get_task_queue,return_result_queue的名称为get_result_queue,方便区分对哪个队列进行操作。task.put(n)是往task_queue中写入数据,相当于分配任务。而result.get()是等待workers机器处理后返回的结果。值得注意的是,windows系统必须写IP地址,其他操作系统如linux操作系统则不需要。#windows需要写ipaddressmanager=QueueManager(address=('127.0.0.1',5000),authkey=b'abc')在task_master.py中修改代码如下:#!/user/bin/python#-*-coding:utf-8-*-#@Time:2018/3/316:46#@Author:lichexo#@File:task_master.py#task_master.pyimportrandom,time,queuefrommultiprocessing。managersimportBaseManagerfrommultiprocessingimportfreeze_supporttask_queue=queue.Queue()#发送任务队列:result_queue=queue.Queue()#接收结果队列:classQueueManager(BaseManager):#QueueManager继承自BaseManager:pass#windowsrundefreturn_que_task_queue():globeretask发送任务队列defreturn_result_queue():globalresult_queuereturnresult_queue#返回接收结果队列deftest():#将两个Queue注册到网络中,可调用参数关联Queue对象,用于进程间通信和交换对象#QueueManager.register('get_task_queue',callable=lambda:task_queue)#QueueManager.register('get_result_queue',callable=lambda:result_queue)QueueManager.register('get_task_queue',callable=return_task_queue)QueueManager.register('get_result_queue',callable=return_queue)绑定端口5000,设置验证码'abc':#manager=QueueManager(address=('',5000),authkey=b'abc')#windows需要写ip地址manager=QueueManager(address=('127.0.0.1',5000),authkey=b'abc')manager.start()#StartQueue:#获取通过网络访问的Queue对象:task=manager.get_task_queue()result=manager.get_result_queue()foriinrange(10):#放入几个任务:n=random.randint(0,10000)print('Puttask%d...'%n)task.put(n)#从结果队列中读取结果:print('Trygetresults...')foriinrange(10):#添加异常在这里捕获try:r=result.get(timeout=5)print('Result:%s'%r)exceptqueue.Empty:print('resultqueueisempty.')#Close:manager.shutdown()print('masterexit.')if__name__=='__main__':freeze_support()print('start!')test()在task_worker.py中修改如下:#!/user/bin/python#-*-coding:utf-8-*-#@Time:2018/3/316:46#@Author:lichexo#@File:task_worker.py#task_worker.pyimporttime,sys,queuefrommultiprocessing。managersimportBaseManager#创建一个类似的QueueManager:classQueueManager(BaseManager):pass#由于这个QueueManager只是从网络获取Queue,所以注册的时候只提供名字:QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')#Connect到服务器,即是运行task_master.py的机器:server_addr='127.0.0.1'print('Connecttoserver%s...'%server_addr)#端口和验证码要和task_master.py设置完全一致:m=QueueManager(address=(server_addr,5000),authkey=b'abc')#从网络连接:m.connect()#获取Queue对象:task=m.get_task_queue()result=m.get_result_queue()#获取任务从任务队列中取出,并将结果写入结果队列:foriinrange(10):try:n=task.get(timeout=1)print('runtask%d*%d...'%(n,n))r='%d*%d=%d'%(n,n,n*n)time.sleep(1)result.put(r)exceptqueue.Empty:print('taskqueueisempty.')#处理结束:print('workerexit.')先运行task_master.py,再运行task_worker.py(1)task_master.py运行结果如下start!Puttask7872...Puttask6931...Puttask1395...Puttask8477...Puttask8300...Puttask1597...Puttask8738。..Puttask8627...Puttask1884...Puttask2561...Trygetresults...Result:7872*7872=61968384Result:6931*6931=48038761Result:1395*1395=1946025Result:8477*8477=71859529Result:38059529Result:380590=0=2550409Result:8738*8738=76352644Result:8627*8627=74425129Result:1884*1884=3549456Result:2561*2561=6558721masterexit.(2)task_worker.py运行结果如下Connecttoserver127.0.0.1...runtask8640*8640...runtask7418*7418...runtask9303*9303...runtask568*568...runtask1633*1633...runtask3583*3583。..runtask3293*3293...runtask8975*8975...runtask8189*8189...runtask731*731...workerexit。知识补充这个简单的Master/Worker模型有什么用呢?其实这是一个简单但真实的分布式计算,稍微修改一下代码,启动多个worker,将任务分发到几台甚至几十台机器上。比如将计算n*n的代码替换为发送邮件,实现邮件队列的异步发送Queue对象。它存储在哪里?注意到task_worker.py中根本没有创建Queue的代码,所以Queue对象存放在task_master.py进程中:之所以可以通过网络访问Queue,是通过QueueManager实现的。由于QueueManager管理着多个Queue,因此必须为每个Queue的网络调用接口起一个名字,比如get_task_queue。QueueManager在task_worker中的注册名必须与task_manager中的注册名一致。对比上面的例子可以看出,Queue对象是从另一个进程通过网络传递过来的。只不过这里的传递和网络通信都是由QueueManager来完成的。authkey有什么用?这是为了保证两台机器之间的正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定会连接不上。

猜你喜欢