什么是Ray?看了Ray相关论文和官网文档大概花了两三天时间。中国大部分人对Ray的认知程度)。首先简单介绍一下我对Ray的理解。首先,基因非常重要,所以我们首先要探究Ray最初是为了解决什么问题而创造的。Ray的论文表明,它最初是为了解决强化学习的挑战而设计的。强化学习的难点在于,它是一个需要在学习的同时进行实时预测的应用场景,这意味着不同类型的任务会同时运行,它们之间存在复杂的依赖关系,会产生任务在运行时动态地针对新的任务,一些现有的计算模型肯定无法解决它们。如果Ray只是为了解决RL,事情可能没有那么复杂,但是作者希望它不仅可以运行强化学习相关的,而且是一个通用的分布式机器学习框架,这意味着Ray必须进行分层抽象,而且它至少分为系统层和应用层。在系统层面,既然是分布式应用,那么应用内部必然有资源/任务的调度和管理。首先,Yarn、K8s等资源调度框架都是在应用层面进行调度。Ray作为解决特定业务问题的应用程序,应该运行在它们之上而不是替换它们。Spark/Flink虽然也是基于任务级的资源调度框架,但是由于它们都是为了解决一个更具体的抽象问题而设计的,所以系统对任务/资源的封装程度比较高。一般用户都是面向业务编程的,很难直接控制任务和对应的任务。资源。我们以Spark为例。用户定义了数据处理逻辑。至于这些逻辑如何划分成多少个Jobs、Stages、Tasks,最终占用了多少Resources(CPU、GPU、Memory、Disk)等等,都是由框架来决定的,用户不能碰它。这是我一直批评Spark的地方。因此,在系统层面,Ray是一个通用的分布式任务执行系统,以任务为调度层面,可以控制每个任务的资源粒度。请记住,在Ray中,您需要明确定义Task和Task依赖关系,并为每个任务指定合适的(数量、资源类型)资源。比如你需要用三个任务来处理一条数据,那么你就需要自己启动三个任务,并指定这些任务需要的资源(GPU、CPU)和数量(可以是小数或整数)。在Spark和Flink中,这不太可能。为了让我们做这些事情,Ray默认提供了Python语言接口,你可以像Numpy一样使用Ray。其实现在已经有基于Ray作为Backend的numpy实现,当然属于应用层。Ray系统层次非常简单,也是典型的master-worker模式。类似于spark的driver-executor模式,不同的是Ray的worker类似于yarn的worker,负责资源管理。它会启动Pythonworker来执行你的特定任务的代码,spark的executor也会启动Pythonworker来执行Python代码,但是对应的executor也执行业务逻辑,和pythonworker有数据交换和传输。在应用层面,可以基于Ray系统进行编程,因为Ray默认提供了Python编程接口,所以可以自己实现强化学习库(RLLib),或者集成已有的算法框架,比如tensorflow,让tensorflow成为Ray网络上的一个应用程序,并且易于分发。记得知乎上有人说Ray其实是一个Python分布式RPC框架,这话是对的,但显然是误导,因为这很可能让人以为它只是一个“Python分布式RPC框架”。如何与Spark配合前面说了Spark的大部分API我们都可以基于Ray来实现,但是是Raybackend而不是Sparkcorebackend。其实Ray目前是在做流相关的功能,他们现在要做的就是兼容Flink的API。虽然官方宣称Ray是新一代的机器学习分布式框架,可以完全覆盖目前大部分的大数据和AI领域,但是还有很长的路要走,还有很多东西需要改进。所以对我来说,我喜欢的是它良好的Python支持,以及在系统层面对资源和任务的控制,这使得:1.我们可以很容易地在Ray中运行我们独立的Python算法库(虽然算法本身不是分布式),但是我们可以利用好Ray的资源管理和调度功能来解决AI平台的资源管理问题。2、Ray官方提供了大量机器学习算法的实现,以及Tensorflow、Pytorch等当前机器学习框架的集成,分布式能力比这些库提供的原生模型更可靠、更易用.毕竟对于这些框架来说,支持其分布式运行的辅助库(比如TensorFlow提供参数服务器)都相当简单。但是,我们知道,数据处理本身就有一个大生态。例如,你的用户画像数据在数据湖中,你需要对这些数据进行非常复杂的计算,然后将它们作为特征输入到你的机器学习算法中。而如果这时候,你要面向资源编程(或者使用一个还不够成熟的上层应用),而不是面向“业务”的编程,那会很不爽。比如我想用SQL来处理数据,而我只专注于处理目前Ray及以上应用的业务逻辑显然没有Spark方便(毕竟Spark就是为数据处理而生的),所以最好方式是,数据的获取和处理还是在Spark上面,但是当数据准备好之后,应该根据Ray写的代码丢给用户处理。Ray可以通过Arrow项目在HDFS上读取Spark处理后的数据,进行训练,然后将模型保存为HDFS。当然,对于预测,雷可以自己消化,也可以丢给其他系统来完成。我们知道Spark在整合Python生态方面做了大量的努力。比如像Ray也提供了python编程接口,所以spark集成Tensorflow等框架相对容易,但是没办法很好的控制资源(比如GPU),spark的executor会启动所在服务器上的pythonworker,而spark一般跑在yarn上,给yarn带来了很多管理上的麻烦,通常yarn和hdfs的关系类都放在一起。除了python环境和资源(CPU/GPU)管理困难之外,还有一个很大的问题可能会对yarn集群造成比较大的稳定性风险。所以最好的模式是按照以下步骤开发一个机器学习应用程序:写一个python脚本,在数据处理部分使用pyspark,在程序的算法训练部分使用ray,spark运行在yarn(k8s)上,以及ray运行在k8s中的好处是显而易见的:用户完全不知道他的应用程序实际上是在两个集群中运行的,对他来说只是一个普通的python脚本。从架构的角度来看,复杂的python环境管理问题可以由ray集群处理,只要spark能运行基本的pyspark相关功能即可,数据连接是通过数据湖中的表(实际上是一堆parquet文件)即可以。当然,如果最终结果数据不大,也可以直接通过客户端完成pyspark到ray的交互。Spark和Ray的架构和部署现在我们来思考一个更好的部署模型。架构图大致是这样的:首先你可以理解k8s已经解决了一切,我们的spark和ray都是跑在k8s上的。但是,如果我们想让一个spark跑多实例多进程,我们并不希望所有的节点都按照传统的方式跑在K8s上,而是把executor部分放在yarn集群中。在我们的架构中,sparkdriver是一个应用程序。我们可以启动多个pod获取多个sparkdriver实例,对外提供负载均衡,roll升级/重启等功能。也就是说k8s应该是面向应用的。但是,我们还是希望把复杂的计算交给Yarn,尤其是数据局部性,计算和存储放在一起(通常yarn和HDFS是在一起的),避免k8s和HDFS大量的数据交换.因为Yarn对Java/Scala友好,对Python不友好,尤其是涉及到yarn中的Python环境问题,处理起来会非常困难(主要是Yarn对docker的支持不够好,对GPU的支持不好),而且实际上机器学习必须非常依赖Python和非常复杂的本地库和Python环境,对资源调度也有比较高的依赖,因为算法会消耗大量的机器资源,必须有一个资源池,所以我们希望机器学习部分可以跑在K8s上。但是我们希望整个数据处理和训练过程是一体的,算法的同学应该感觉不到k8s/yarn的区别。为了达到这个目的,用户还是使用pyspark来完成计算,然后在pyspark中使用rayAPI进行模型训练和预测。数据处理部分在yarn中自动完成,而模型训练部分自动分发到k8s中完成。并且因为ray本身的优势,算法可以很好的控制自己需要的资源,比如这次训练需要多少GPU/CPU/内存,并且支持所有的算法库。我们有最好的资源调度可用。下面显示了一段MLSQL代码来展示如何使用上述架构:--pythontrainingmodelsetpy_train='''importrayray.init()@ray.remote(num_cpus=2,num_gpus=1)def的代码f(x):returnx*xfutures=[f.remote(i)foriinrange(4)]print(ray.get(futures))''';loadscript.`py_train`aspy_train;--设置需要的python环境说明setpy_env='''''';loadscript.`py_env`aspy_env;--loadhivetableloadhive.`db1.table1`astable1;--processHive,比如做一些featureengineeringselectfeatures,labelfromtable1asdata;--将Python代码提交给Ray,这是在k8s中运行的PythonAlg的train数据。`/tmp/tf/model`wherescripts="py_train"andentryPoint="py_train"和condaFile="py_env"andkeepVersion="true"和fitParam.0.fileFormat="json"——它也可以是镶木地板和`fitParam.0.psNum`="1";以下是PySpark的示例代码:来自pyspark。ml.linalgimportVectors,SparseVectorfrompyspark.sqlimportSparkSessionimportloggingimportrayfrompyspark.sql.typesimportStructField,StructType,BinaryType,StringType,ArrayType,ByteTypefromsklearn.naive_bayesimportGaussianNBimportosfromsklearnsextern。joblibimportpickleimportscipy.sparseasspfromsklearn.svmimportSVCimportioimportcodecsos.environ["PYSPARK_PYTHON"]="/Users/allwefantasy/deepavlovpy3/bin/python3"logger=logging.getLogger(__name__)base_dir="/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"spark=SparkSession.builder.master("local[*]").appName("example").getOrCreate()data=spark.read.format("libsvm").load(base_dir)+"/data/mllib/sample_libsvm_data.txt")##广播数据dataBr=spark.sparkContext.broadcast(data.collect())##训练模型这部分代码会在spark执行器中的pythonworker中执行deftrain(row):importrayray.init()train_data_id=ray.put(dataBr.value)##该函数的python代码将在K8s中的Ray中执行@ray.remotedefray_train(x):X=[]y=[]foriinray.get(train_data_id):X.append(i["features"])y.append(i["label"])ifrow["model"]=="SVC":gnb=GaussianNB()model=gnb.fit(X,y)#为什么我们需要enc颂?pickled=codecs.encode(pickle.dumps(model),"base64").decode()返回[row["model"],pickled]ifrow["model"]=="BAYES":svc=SVC()model=svc.fit(X,y)pickled=codecs.encode(pickle.dumps(model),"base64").decode()返回[row["model"],pickled]result=ray_train.remote(row)ray.get(result)##训练模型并将模型结果保存到HDFSrdd=spark.createDataFrame([["SVC"],["BAYES"]],["model"]).rdd.map(train)spark.createDataFrame(rdd,schema=StructType([StructField(name="modelType",dataType=StringType()),StructField(name="modelBinary",dataType=StringType())])).写入。\格式(“镶木地板”)。\mode("overwrite").save("/tmp/wow")这是一个标准的Python程序,仅仅使用了pyspark/ray的API,我们就完成了以上所有的工作,训练了两个A模型,以及的工作数据处理在spark,模型训练在ray完美结合!最重要的是解决资源管理的问题!作者简介:朱威廉,资深数据架构师,11年研发经验。同时维护和开发多个开源项目。擅长大数据/AI领域的一些想法和工具。现在专注于构建一个集成大数据和机器学习的综合平台,以降低人工智能实施的成本。本文作者:朱威廉阅读原文为阿里云内容,未经允许不得转载。
