当前位置: 首页 > 后端技术 > Python

安利Python大数据分析神器!_0

时间:2023-03-26 18:32:36 Python

作者:东格非首发于公众号:Python数据科学提高Pandas运行速度的方法之前介绍过很多次,里面经常提到Dask。很多没接触过的朋友可能不理解。今天就推荐这款神器。1.什么是达斯克?每个人都熟悉Pandas和Numpy。代码运行后,数据被加载到RAM中。如果数据集特别大,我们会看到内存飙升。但是有时要处理的数据在RAM中放不下,这时候Dask就来了。Dask是开源且免费的。它是与Numpy、Pandas和Scikit-Learn等其他社区项目协调开发的。官方:https://dask.org/Dask支持Pandas的DataFrame和NumpyArray数据结构,可以在本地计算机上运行,??也可以扩展到集群上运行。基本上,只需编写一次代码,使用常见的Pythonic语法,然后在本地运行或部署在多节点集群上。这本身就是一个很棒的功能,但它不是最好的。我认为Dask最棒的功能是:它与我们已经使用的大多数工具兼容,并且只需更改少量代码,您就可以使用笔记本电脑上已有的处理能力来并行运行代码.并行处理数据意味着更少的执行时间、更少的等待时间和更多的分析时间。下面是Dask进行数据处理的大致流程。2.Dask支持哪些现有工具?这是我比较喜欢的一点,因为Dask兼容Python数据处理和建模库包,并且使用库包的API,对于Python用户来说学习成本极低。而像Hadoop、Spark这样的大数据处理,学习门槛和时间成本都很高。目前Dask可以支持pandas、Numpy、Sklearn、XGBoost、XArray、RAPIDS等,我觉得光是这几项就足够了,至少对于常用的数据处理和建模分析来说是这样。3、dask安装可以使用conda或者pip,也可以从源码安装dask。condainstalldask因为dask有很多依赖,下面的代码也可以用于快速安装,它将安装运行Dask所需的最小依赖集。condainstalldask-core然后通过源安装。gitclonehttps://github.com/dask/dask.gitcddaskpython-mpipinstall.4。如何使用达斯克?Numpy和pandasDask引入了三个可以存储大于RAM的数据的并行集合。这些集合包括DataFrame、Bags和Arrays。这些集合类型中的每一种都能够使用在RAM和硬盘之间分区并分布在集群中的多个节点上的数据。Dask的使用很明确,如果用NumPy数组,就从Dask数组入手,如果用PandasDataFrames,就从DaskDataFrames入手,等等。importdask.arrayasdax=da.random.uniform(low=0,high=10,size=(10000,10000),#normalnumpycodechunks=(1000,1000))#分成大小为1000x1000y=x的块+x.T-x.mean(axis=0)#对高级算法使用普通语法#DataFramesimportdask.dataframeasdddf=dddf=dd.read_csv('2018-*-*.csv',parse_dates='timestamp',#正常的Pandas代码blocksize=64000000)#将文本分成64MB的块ss=df.groupby('name').balance.mean()#使用高级算法的正常语法#Bags/listsimportdask.bagasdbb=db.read_text('*.json').map(json.loads)total=(b.filter(lambdad:d['name']=='Alice').map(lambdad:d['balance']).sum())这些高级接口复制标准接口并稍作更改。对于原项目中的大部分API,这些接口会自动帮我们并行处理更大的数据集,实现起来也不是很复杂。可以根据Dask的doc文档一步步完成。Delayed先说Dask的Delay功能,非常强大。Dask.delayed是一种简单而强大的并行化现有代码的方法。之所以叫delayed,是因为它不是立即计算结果,而是将任务计算的结果记录在一个图中,稍后在并行硬件上运行。有时使用现有的dask.array或dask.dataframe的问题可能不合适。在这些情况下,我们可以使用更简单的dask.delayed接口来并行化自定义算法。例如下面的例子。definc(x):returnx+1defdouble(x):returnx*2defadd(x,y):returnx+ydata=[1,2,3,4,5]output=[]forxindata:a=inc(x)b=double(x)c=add(a,b)output.append(c)total=sum(output)45以上代码在单线程中顺序运行。但是,我们看到其中许多可以并行执行。Dask延迟函数可以修改inc、double等函数,使它们延迟执行,而不是立即执行函数,而是将函数及其参数放入计算任务图中。我们简单地修改代码并用延迟函数包装它。importdaskoutput=[]forxindata:a=dask.delayed(inc)(x)b=dask.delayed(double)(x)c=dask.delayed(add)(a,b)output.append(c)total=dask.delayed(sum)(output)代码运行后,inc、double、a??dd、sum还没有发生,但是生成了一个计算任务图交给total。然后我们使用visualizatize查看任务图。total.visualize()上图很明显的说明了并行的可能性,于是毫不犹豫,使用compute进行并行计算,此时计算完成。>>>total.compute()45由于数据集太小,没法比较时间,这里只说使用方法,大家可以自己练习。Sklearn机器学习是关于机器学习的并行执行。由于内容较多,东哥另文展开。这里简单说说dask-learn。dask-learn项目是与Sklearn开发人员合作完成的。现在可以使用Scikit-learn的Pipeline、GridsearchCV和RandomSearchCV及其变体进行并行化,它们可以更好地处理嵌套并行操作。所以,如果把sklearn换成dklearn,速度会提高很多。#fromsklearn.grid_searchimportGridSearchCVfromdklearn.grid_searchimportGridSearchCV#fromsklearn.pipelineimportPipelinefromdklearn.pipelineimportPipeline下面是一个使用Pipeline的例子,其中应用了PCA和逻辑回归。从sklearn.datasets导入make_classificationX,y=make_classification(n_samples=10000,n_features=500,n_classes=2,n_redundant=250,random_state=42)fromsklearnimportlinear_model,decompositionfromsklearn.pipelineimportPipelinefromdklearn.pipelineimportPipelinelogistic=linear_model.LogisticRegression()pca=decomposition.PCA()pipe=Pipeline(steps=[('pca',pca),('logistic',logistic)])grid=dict(pca__n_components=[50,100,150,250],logistic__C=[1e-4,1.0,10,1e4],logistic__penalty=['l1','l2'])#fromsklearn.grid_searchimportGridSearchCVfromdklearn.grid_searchimportGridSearchCVestimator=GridSearchCV(pipe,grid)estimator.fit(X,y)结果:sklearn在大约40秒内执行此计算,而dask-learn替代方案大约需要10秒。另外,如果添加如下代码连接集群,则可以通过Client显示整个计算过程的dashboard,这是Bokeh实现的。fromdask.distributedimportClientc=Client('scheduler-address:8786')5.总结以上是对Dask的简单介绍。Dask的功能非常强大,文档也非常全面,既有例子又有解释。感兴趣的朋友可以去官网或者GitHub自行学习。东哥下次会分享一些使用Dask做机器学习的例子。原创不易,我觉得挺好的,也喜欢。欢迎关注我的个人公众号:PythonDataScience数据科学学习网站:datadeepin