Python中文社区(ID:python-china)1.什么是Ray分布式计算框架?大家一定不陌生,比如离线计算Hadoop(map-reduce)、spark、流计算strom、Flink等。相对来说,这些计算框架都依赖于其他大数据组件,安装部署相对复杂。在python中,之前分享过的Celery可以提供分布式计算。今天给大家分享另一个开源的分布式计算框架Ray。Ray是UCBerkeleyRISELab推出的全新高性能分布式执行框架。计算性能优于Spark,更易于部署和改造。还支持机器学习和深度学习的分布式训练,支持主流的深度学习框架(pytorch、tensorflow、keras等)。https://github.com/ray-project/ray2。Ray架构关于Ray的架构,请参考最早发表的论文Ray:ADistributedFrameworkforEmergingAIApplications。从上图我们可以看出Ray主要包括:Node:节点,主要有head和worker,head可以认为是Master,worker是执行任务的单元每个节点都有自己的localschedulerlocalschedulerobjectstore:一个内存对象存储,允许节点之间的通信调度器:有两个调度器,每个节点都有一个本地调度器。提交任务时,LocalScheduler会判断是否需要提交给GlobalScheduler分发给其他worker执行。GCS:全局状态控制记录了Ray中各种对象的状态信息,可以认为是元数据,是Ray容错性的保证。Ray适用于任何分布式计算任务,包括分布式训练。笔者最近在大量的时间序列预测模型训练和在线预测中使用了它。Ray目前的库支持超参数调优Raytune、梯度下降RaySGD、推理服务RaySERVE、分布式数据Dataset、分布式强化学习RLlib。还有其他第三方库,如下:3.简单使用3.1安装部署pipinstall--upgradepip#pipinstallraypipinstallray==1.6.0#ImportError:cannotimportname'deep_mapping'from'attr.validators'#pipinstallattr==19.1.03.2单机使用一个简单的例子Ray使用@ray.remote装饰器使函数成为一个可以分布式调用的任务。通过函数名.remote方法提交任务,通过ray.get方法获取任务返回值。点击案例类似于多线程异步执行的方式。importtimeimportrayray.init(num_cpus=4)#Specifythissystemhas4CPUs.@ray.remotedefdo_some_work(x):time.sleep(1)#Replacethisiswithworkyouneedtodo.returnxstart=time.time()results=ray.get([do_some_work.remote(x)forxinrange(4)])print("duration=",time.time()-start)print("results=",results)#duration=1.0107324123382568#results=[0,1,2,3]返回对象的id通过远程例如ObjectRef(7f10737098927148ffffffff0100000001000000)。您需要通过ray.get获取实际值。需要注意的是ray.get是阻塞调用,不能[ray.get(do_some_work.remote(x))forxinrange(4)]注意小任务的使用需要注意的是ray分布式计算调度时需要花费额外的时间,比如调度、进程间通信、任务状态更新等,所以要避免太小的任务。小任务可以合并@ray.remotedeftiny_work(x):time.sleep(0.0001)#Replacethisiswithworkyouneedtodo.returnxstart=time.time()result_ids=[tiny_work.remote(x)forxinrange(100000)]results=ray.get(result_ids)print("duration=",time.time()-start)ray.putray.put()把一个对象放到对象存储上,返回一个对象id,可以在分布式机器上调用,操作是异步的。可以通过ray.get()获取。num=ray.put(10)ray.get(num)ray.wait如果任务返回多个结果,ray.get()会等到所有结果都完成后再执行后续操作。如果多个结果的执行耗时不同,此时的短板就在于最长的任务。这时候就可以使用ray.wait()方法了。ray.wait()返回完成和未完成的任务结果,完成的执行结果可以继续后续操作。importrandom@ray.remotedefdo_some_work(x):time.sleep(random.uniform(0,4))#Replacethisiswithworkyouneedtodo.returndefprocess_incremental(sum,result):time.sleep(1)#Replacethiswithsomeprocessingcode.returnsum+resultstart=time.time()结果_ids=[do_some_work.remote(x)forxinrange(4)]sum=0whilelen(result_ids):done_id,result_ids=ray.wait(result_ids)sum=process_incremental(sum,ray.get(done_id[0]))print("duration=",time.time()-start,"\nresult=",sum)#duration=5.270821809768677#result=62.3集群部署Ray的架构遵循主从模型。HeadNode可以认为是Master,其他Nodes都是worker。部署集群时,HeadNode需要先启动raystart--head,其他机器依次启动worker。注意需要指定headNode的地址来确定关系,raystart--address10.8.xx.3:6379。关闭服务需要每台机器执行ray.stop#Tostartaheadnode.#raystart--head--num-cpus=
