CDA数据分析师出品相信大家在做一些算法的时候,经常会被庞大的计算量所带来的巨大的计算量所需要的时间所折磨数据的。接下来我们重点介绍四种方法,帮助你加快Python的计算时间,减少算法的等待时间。今天给大家讲的是最后一个方面,关于Dask方法的应用。1.简介随着机器学习算法并行化的需求不断增加,由于数据量甚至模型大小呈指数级增长,如果我们有一个工具可以帮助我们并行化处理Pandas的DataFrame,我们就可以将Pandas的DataFrame并行化处理Numpy的计算,甚至并行化我们的机器学习算法(可能来自sklearn和Tensorflow)而不会太麻烦,那么这将对我们有很大帮助。好消息是这样的库确实存在,它的名字叫Dask。Dask是一个并行计算库,不仅可以帮助并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),还可以帮助并行化低级任务/函数,可以通过制作任务图来完成处理这些功能之间的复杂交互。[IE。使用低级调度程序]这类似于Python的线程或多处理模块。他们还有一个单独的机器学习库dask-ml,它集成了现有的库,例如sklearn、xgboost和tensorflow。Dask通过绘制任务之间的交互图来并行化分配给它的任务。使用Dask的.visualize()方法可视化您的工作非常有帮助,该方法适用于所有数据类型和计算的复杂任务链。此方法将输出您的任务图,如果您的任务在每个级别都有多个节点(即,您的任务链结构在多个级别有许多独立任务,例如数据块上的并行任务),那么Dask将能够将它们并行化.注意:Dask仍然是一个相对较新的项目。它还有很长的路要走。但是,如果你不想学习一个全新的API(比如PySpark),Dask是你最好的选择,而且以后肯定会越来越好。Spark/PySpark仍然遥遥领先,并且仍在继续改进。这是一个完善的Apache项目。2.数据类型Dask中的每一种数据类型都提供了现有数据类型的分布式版本,例如pandas中的DataFrame、numpy中的ndarray、Python中的list。这些数据类型可能比您的内存大,Dask将以Blocked方式并行(y)对数据运行计算。阻塞是指它们通过执行许多小计算(即以块为单位)来执行大型计算,其中块的数量是块的总数。a)数组:网格中的许多Numpy数组作为Dask数组DaskArray在非常大的数组上运行,将它们分成块并并行执行这些块。它有许多可用的numpy方法,您可以使用它们来加快速度。但其中一些没有实现。只要支持numpy切片,DaskArray就可以从任何类似数组的结构中读取,并且可以通过使用和使用Dask.Array.from_array方法具有.shape属性。它还可以读取.npy和.zarr文件。importdask.arrayasdaimportnumpyasnparr=numpy.random.randint(1,1000,(10000,10000))darr=da.from_array(arr,chunks=(1000,1000))#它将生成大小为(1000,1000)inchunksdarr.npartitioins#100当你的数组真的很重(即它们不适合内存)并且numpy对此无能为力时使用。所以Dask将它们分成数组块并为您并行处理它们。Dask现在对每个方法执行惰性评估。因此,要实际计算函数的值,必须使用.compute()方法。它将以块的形式并行化计算结果,同时并行化每个独立的任务。result=darr.compute()1)Numpy在元素个数少的时候比Dask快;2)Dask接管Numpy,取约1e7个元素;3)Numpy无法生成包含更多元素的结果,因为它无法将存储在内存中的元素组合起来。b)DataFrame:5个PandasDataFrame在一个DaskDataFrame中提供月度数据(可以来自diff文件)内存中非常大的数据文件的并行计算。importdask.dataframeasdddf=dd.read_csv("BigFile(s).csv",blocksize=50e6)现在,您可以应用/使用pandas库中可用的大多数函数并在此处应用。agg=df.groupby(["column"]).aggregate(["sum","mean","max","min"])agg.columns=new_column_names#请检查notebookdf_new=df.merge(agg.reset_index(),on="column",how="left")df_new.compute().head()c)Bag:DaskBag包并行处理包含多个数据类型元素的Python类列表对象。当您尝试处理一些半结构化数据(如JSONblob或日志文件)时,此功能很有用。importdask.bagasdbb=db.from_txt("BigSemiStructuredData.txt")b.take(1)Daskbag逐行读取,.take方法输出指定行数的元组。DaskBag对此类Python对象集合实现了map、filter、fold和groupby等操作。它使用内存占用量小的Python迭代器并行执行此操作。它类似于PyToolz的并行版本或PySparkRDD的Python版本。filtered=b.filter(lambdax:x["Name"]=="James").map(lambdax:x["Address"]="New_Address")filtered.compute()3.如果你的延迟任务有点简单,并且您不能或不想使用这些高级集合来执行操作,您可以使用低级调度程序来帮助您使用dask.delayed接口并行化代码/算法。dask.delayed也可以执行延迟计算。importdask.delayedasdelay@delaydefsq(x):returnx**2@delaydefadd(x,y):returnx+y@delaydefsum(arr):sum=0foriinrange(len(arr)):sum+=arr[i]returnsum您可以根据需要在这些函数之间添加复杂的交互,使用上一个任务的结果作为下一个任务的参数。Dask不会立即计算这些功能,而是绘制您的任务,有效地合并您使用的功能之间的交互。inputs=list(np.arange(1,11))#将插件程序dask.delayed添加到列表中temp=[]foriinrange(len(inputs)):temp.append(sq(inputs[i]))#计算输入sq并保存#列表中的延迟计算inputs=temp;temp=[]foriinrange(0,len(inputs)-1,2):temp.append(add(inputs[i]+inputs[i+1]))#添加两个连续的#来自前一步输入的结果=tempresult=sum(inputs)#对所有前面步骤的结果求和results.compute()您可以添加延迟以在任何可并行代码中拥有许多可能的小块,从而获得加速。它可以是您想要计算的任意多个函数,如上面的示例,或者您可以使用pandas.read_csv并行读取多个文件。4.分布式首先,到目前为止,我们一直在使用Dask的默认调度程序来计算任务的结果。但是您可以根据自己的需要从Dask提供的选项中更改它们。Dask带有四个可用的调度程序:线程:由线程池支持的调度程序进程:由进程池支持的调度程序单线程(又名“同步”):同步调度程序,用于调试分布式:用于执行图形的分布式调度程序在多台计算机上result.compute(scheduler="single-threaded")#fordebugging#或dask.config.set(scheduler="single-threaded")结果。compute()#Note:(fromofficialwebpage)#当称为GIL的函数被释放时,线程任务将工作得很好,而多处理总是有较慢的启动时间,并且需要任务之间的大量通信。#你可以通过以下命令之一获取调度程序:dask.threaded.get,dask.multiprocessing.get,dask.local.get_sync#最后一个用于单线程但是,Dask也有一个调度程序,dask.distributed用于以下它可能更喜欢使用的原因:1.它提供对异步API的访问,尤其是Futures,1.它提供了一个诊断仪表板,可以提供对性能和进度的有价值的见解1.它可以更复杂地处理数据局部性,因此效率更高比需要多个进程的工作负载上的多处理调度程序。您可以为Dask创建一个dask.distributed调度程序,通过导入和创建一个客户端来实现分布式调度程序fromdask.distributedimportClientclient=Client()#设置本地集群#您可以导航到http://localhost:8787/statusView#DiagnosticsDashboard,如果你安装了Bokeh你现在可以使用client.submit方法将作业提交到这个集群,将一个函数和参数作为它的参数。然后我们可以使用client.gather或.result方法来收集结果。sent=client.submit(sq,4)#sq:squarefunctionresult=client.gather(sent)#orsent.result()也可以直接使用dask.distributed.progress查看当前cell的任务进度.您还可以明确选择使用dask.distributed.wait来等待任务完成。注意:(LocalCluster)有时你会注意到Dask即使在划分任务也超出了内存使用率。这可能发生在您身上,因为您尝试在数据集上使用的函数需要处理大部分数据,而多处理会使情况变得更糟,因为所有工作人员都可能试图将数据集复制到内存中间。这可能发生在聚合的情况下。或者你可能想限制Dask只使用一定数量的内存。在这些情况下,您可以使用Dask.distributed。LocalCluster参数并将它们传递给Client()以使用本地机器的核心构建LocalCluster。从dask.distributedimportClient,LocalClusterclient=Client(n_workers=1,threads_per_worker=1,processes=False,memory_limit='25GB',scheduler_port=0,silence_logs=True,diagnostics_port=0)client'scheduler_port=0'and'stics_port=0'将为该特定客户端选择一个随机端口号。使用'process=False'dask的客户端不会复制数据集,这可能发生在您创建的每个进程中。您可以根据您的需要或约束来调整客户端,有关更多信息,您可以查看LocalCluster的参数。您还可以在同一台机器上的不同端口上使用多个客户端。5.机器学习Dask还有一个库,可以帮助和允许大多数流行的机器学习库,例如sklearn、tensorflow和xgboost。在机器学习中,您可能会遇到几个不同的问题。具体策略取决于你面临的问题:1.大模型:数据适合RAM,但训练时间太长。许多超参数组合,许多模型的大型集合等。1.大型数据集:数据大于RAM,采样不是一种选择。因此,您应该:·对于内存适合问题,只需使用scikit-learn(或您最喜欢的ML库);·对于大型模型,使用dask_ml.joblib和你最喜欢的scikit-learn估计器·对于大型数据集,使用dask_ml估计器。a)预处理:dask_ml.preprocessing包含sklearn的一些函数,如RobustScalar(鲁棒标量)、StandardScalar(标准标量)、LabelEncoder(标签编码器)、OneHotEncoder(one-hot编码)、PolynomialFeatures(多项式特征)等,还有它自己的一些如Categorizer(分类器)、DummyEncoder(虚拟编码)、OrdinalEncoder(序数编码器)等。你可以像PandasDataFrame一样使用它们。fromdask_ml.preprocessingimportRobustScalardf=da.read_csv("BigFile.csv",chunks=50000)rsc=RobustScalar()df["column"]=rsc.fit_transform(df["column"])可以使用Dask的DataFrame上面的预处理方法从Sklearn的Make_pipeline方法生成一个管道。b)超参数搜索:Dask有超参数搜索的sklearn方法,如GridSearchCV、RandomizedSearchCV等。从dask_ml.datasets导入make_regression从dask_ml.model_selection导入train_test_split,GridSearchCVX,y=make_regression(chunks=50000)xtr,ytr,xval,yval=test_train_split(X,y)gsearch=GridSearchCV(estimator,param_grid=10),cvfit(xtr,ytr)而且,如果你想将partial_fit与估计器一起使用,你可以使用dask-ml的IncrementalSearchCV。注意:(来自Dask)如果要使用评分和预测等后拟合任务,请使用底层估计器评分方法。如果您的估算器(可能来自sklearn)无法处理大型数据集,请将您的估算器包裹在“dask_ml.wrappers.ParallelPostFit”周围。它可以并行化诸如“predict”、“predict_proba”、“transform”等方法。Tensorflow,一种使用Dask训练XGBoost模型的方法。如果您的训练数据很小,您可以将sklearn的模型与Dask一起使用,或者如果您的测试数据很大,则可以与ParallelPostFit包装器一起使用。fromsklearn.linear_modelimportElasticNetfromdask_ml.wrappersimportParallelPostFitel=ParallelPostFit(estimator=ElasticNet())el.fit(Xtrain,ytrain)preds=el.predict(Xtest)如果数据集不大但是模型很大,你可以使用joblib。sklearns写了很多并行执行的算法(你可能用过n_jobs=-1参数),joblib这个算法利用线程和进程来并行化工作负载。要使用Dask并行化,您可以创建一个Client(客户端)(必需),然后用joblib.parallel_backend('dask'):包装代码。importdask_ml.joblibfromsklearn.externalsimportjoblibclient=Client()withjoblib.parallel_backend('dask'):#Yourscikit-learncode注意:DASKJOBLIB后端对于扩展CPU绑定的工作负载很有用;在包含数据集但有许多可以并行完成的单独操作的RAM工作负载中。要扩展到RAM受限的工作负载(大于内存的数据集),您应该使用Dask的内置模型和方法。此外,如果您的训练数据太大而无法放入内存,那么您应该使用Dask的内置估算器来加快速度。您还可以使用Dask的wrapper.Incremental,它使用底层估计器的partial_fit方法在整个数据集上进行训练,但实际上是连续的。Dask的内置估计器可以很好地扩展到具有多种优化算法(如admm、lbfgs、gradient_descent等)和正则化器(如L1、L2、ElasticNet等)的大型数据集。fromdask_ml.linear_modelimportLogisticRegressionlr=LogisticRegression()lr.fit(X,y,solver="lbfgs")4集内容讲解后,Python算法提速的四种方法你学会了吗?疫情当下,往日匆忙的脚步终于慢下来,是时候好好想想自己的职业规划和人生规划了。未雨绸缪,未雨绸缪,为未来积蓄能量——蓄势待发!(1)更多优质内容和精彩资讯,访问:https://www.cda.cn/?seo(2)搜索CDA小程序,随时随地浏览最新资讯和优质课程手机:
