当前位置: 首页 > 科技观察

当火星遇上RAPIDS:用GPU加速数据科学

时间:2023-03-17 13:21:49 科技观察

在数据科学的世界里,Python是一个不可忽视的存在,而且只会越来越差。使用的主要工具包括Numpy、Pandas和Scikit-learn。Mars诞生于MaxCompute团队。本文将分享如何使用Mars实现Numpy、pandas、scikit-learn等数据科学库的并行分布式执行,并结合RAPIDS平台使用GPU加速数据科学。NumpyNumpy是数值计算的基础包。它内部提供了多维数组(ndarray)等数据结构,用户可以方便地进行任意维度的数值计算。让我们举一个求解Pi的蒙特卡罗方法的例子。这背后的原理很简单,现在我们有一个半径为1的圆和一个边长为2的正方形,它们的圆心都在原点。现在我们生成大量均匀分布的点,让这些点落在正方形中。通过简单的推导,我们可以知道Pi的值=落入圆圈的点数/总点数*4。这里需要注意的是,随机生成的点数越多,结果越准确.使用Numpy实现如下:importnumpyasnpN=10**7#100万个点data=np.random.uniform(-1,1,size=(N,2))#生成1000万个x轴和y轴点between-1and1inside=(np.sqrt((data**2).sum(axis=1))<1).sum()#计算到原点距离小于1的点数pi=4*inside/Nprint('pi:%.5f'%pi)可以看出使用Numpy进行数值计算非常简单,只需要几行代码,如果读者习惯了Numpy的方式考虑数组之后,无论是代码的可读性还是执行效率都会有很大的提升。pandaspandas是一个强大的数据分析处理工具,它包含了大量的API,可以帮助用户分析和处理二维数据(DataFrame)。pandas中的一个核心数据结构是DataFrame,可以简单理解为表数据,不同的是它在行和列上都包含了索引(Index)。需要注意的是,这与数据库索引的概念不同。它的index可以这样理解:从row看DataFrame时,我们可以把DataFrame看成一个从rowindex到rowdata的字典。通过行索引,可以方便的选取一行数据;列也是如此。下面以movielens数据集为例,看看pandas是如何使用的。这里我们使用Movielens20MDataset.importpandasaspdratings=pd.read_csv('ml-20m/ratings.csv')ratings.groupby('userId').agg({'rating':['sum','mean','max','min']})可以通过一行简单的pandas.read_csv读取CSV数据,然后通过userId进行分组聚合,求出每组中rating列的总和、平均值、最大值、最小值.“吃掉”pandas的最佳方式是在Jupyternotebook中交互式地分析数据。这段经历会让你感叹:人生苦短,我用xx(😉)scikit-learnscikit-learn是一个Python机器学习包,提供了大量的机器学习算法。用户无需了解算法的细节,通过几个简单的高级接口即可完成机器学习任务。当然,现在很多算法都使用了深度学习,但是scikit-learn仍然可以作为一个基础的机器学习库来串联整个过程。让我们以K最近邻算法为例,看看这个任务是如何用scikit-learn完成的。importpandasasdfromsklearn.neighborsimportNearestNeighborsdf=pd.read_csv('data.csv')#输入是一个CSV文件,包含200,000个向量,每个向量有10个元素nn=NearestNeighbors(n_neighbors=10)nn.fit(df)neighbors=nn.kneebors(df)fit接口是scikit-learn中最常用的学习接口。可以看到整个过程非常简单易懂。Mars-Numpy、pandas和scikit-learn的并行和分布式加速器Python数据科学堆栈非常强大,但它们存在以下问题:现在是多核时代,这些库中很少有操作可以利用多核能力。随着深度学习的普及,用于加速数据科学的新硬件层出不穷,其中最常见的就是GPU,它在深度学习的预处理过程中进行数据处理,我们是不是也可以用GPU来加速呢?几个库的操作都是命令式的,命令式的对应物是声明式的。命令式更关心怎么做,每次操作都会立即得到结果,方便探索结果,优点是非常灵活;缺点是中间进程可能会占用大量内存,不能及时释放,而且每次操作之间都是阻塞split,没办法做operatorfusion来提升性能;对应的语句恰恰相反,它更关心做什么,它只关心结果,而不太关心中间怎么做,典型的语句就像SQL和TensorFlow1。x,声明式风格可以在用户真正需要结果的时候执行,这就是惰性求值。在这个中间过程中可以做很多优化,性能会更好,缺点自然是命令式的优点,不够灵活,调试起来比较困难。为了解决这些问题,我们开发了Mars。Mars诞生于MaxCompute团队。其主要目标是实现Numpy、pandas、scikit-learn等数据科学库的并行分布式执行,充分利用多核和新硬件。在Mars的开发过程中,我们的核心关注包括:我们希望Mars足够简单,任何会使用Numpy、pandas或scikit-learn的人都会使用Mars。为了避免重新发明轮子,我们希望利用这些库已有的成果,只要能调度到多核/多机即可。声明式和命令式,用户可以在两者之间自由选择,兼具灵活性和性能。它足够健壮,可以在生产中使用,并且可以处理各种故障转移情况。当然,这些都是我们的目标,也是我们一直努力的方向。Mars张量:Numpy的并行和分布式加速器上面说过,我们的目标之一是只要你会用Numpy这样的数据科学包,你就会用Mars。我们直接看代码,还是以蒙特卡洛为例。成为火星的代码是什么样的?importmars.tensorasmtN=10**10data=mt.random.uniform(-1,1,size=(N,2))inside=(mt.sqrt((data**2).sum(axis=1))<1).sum()pi=(4*inside/N).execute()print('pi:%.5f'%pi)你可以看到只有两个区别:importnumpyasnp变成importmars.tensor作为mt,以及随后的np。都变成mt.;pi在打印之前调用.execute()方法。也就是说,默认情况下,Mars会采用声明式的方式,代码本身的移植成本极低,真正需要某条数据时,通过.execute()触发执行。这可以最大限度地优化性能并减少中间进程内存消耗。在这里,我们还将数据的大小增加了1000倍,达到100亿点。当数据量是之前的1/1000时,在我的笔记本上用了757ms;现在数据放大千倍,光是数据就需要150G的内存,Numpy本身做不到。使用Mars,计算时间仅需3min44s,峰值内存仅需1G左右。假设我们认为内存是无限的,那么Numpy需要的时间是之前的1000倍,大约是12分钟。我们可以看到Mars充分利用了多核的能力,以声明的方式大大减少了中间内存的使用。前面说到,我们尽量同时拥有声明式和命令式两种风格,使用命令式风格,只需要在代码开头配置一个选项即可。importmars.tensorasmtfrommars.configimportoptionsoptions.eager_mode=True#开启eagermode后,每次调用都会立即执行,行为和Numpy完全一样N=10**7data=mt.random.uniform(-1,1,size=(N,2))inside=(mt.linalg.norm(data,axis=1)<1).sum()pi=4*inside/N#不需要调用.execute()print('pi:%.5f'%pi.fetch())#目前需要fetch()转换为float类型,以后会增加MarsDataFrame的自动转换:pandas并行分布式加速器已经看到如何轻松实现将Numpy代码迁移到Marstensor,想必读者也知道如何迁移pandas代码,区别只有两处。我们还是以movielens的代码为例。importmars.dataframeasmdratings=md.read_csv('ml-20m/ratings.csv')ratings.groupby('userId').agg({'rating':['sum','mean','max','min']}).execute()MarsLearn:scikit-learn的并行分布式加速器MarsLearn也是如此,这里不再赘述。但是目前Marslearn支持的scikit-learn算法并不多,我们也在进行移植,需要大量的人力和时间。欢迎有兴趣的同学参与。importmars.dataframeasmdfrommars.learn.neighborsimportNearestNeighborsdf=md.read_csv('data.csv')#输入是一个CSV文件,包含20万个向量,每个向量有10个元素nn=NearestNeighbors(n_neighbors=10)nn.fit(df)#Here,拟合的时候会整体触发执行,所以机器学习的高层接口都是立即执行的,对于机器学习的fit、predict等高层接口,MarsLearn也会立即触发执行,保证语义的正确性。RAPIDS:DataScienceontheGPU相信细心的观众已经发现GPU好像没有被提及。别担心,这是关于RAPIDS的。过去,虽然CUDA将GPU编程的门槛降低到了很低的水平,但是对于数据科学家来说,在GPU上处理Numpy、pandas等都可以处理的数据无异于天方夜谭。幸运的是,NVIDIA开源了RAPIDS数据科学平台,这与Mars的一些想法高度吻合,即Numpy、pandas、scikit-learn的代码只需简单的导入替换就可以移植到GPU上。其中RAPIDScuDF用于pandas加速,RAPIDScuML用于scikit-learn加速。对于Numpy,CuPy已经很好地支持GPU加速,这样RAPIDS也可以专注于数据科学的其他部分。CuPy:GPU加速的Numpy或Pi的MonteCarlo。importcupyascpN=10**7data=cp.random.uniform(-1,1,size=(N,2))inside=(cp.sqrt((data**2).sum(axis=1))<1).sum()pi=4*inside/Nprint('pi:%.5f'%pi)在我的测试中,它将CPU时间从757ms减少到只有36ms,提升了20多倍。可以说效果非常显着。这是因为GPU非常适合计算密集型任务。RAPIDScuDF:使用GPU加速pandas将importpandasaspd替换为importcudf。用户不再需要关心GPU和CUDA编程内部的并行概念。importcudfratings=cudf.read_csv('ml-20m/ratings.csv')ratings.groupby('userId').agg({'rating':['sum','mean','max','min']})运行时间从CPU上的18s提升到GPU上的1.66s,提升了10多倍。RAPIDScuML:使用GPU加速scikit-learn也是k-最近邻问题。importcudffromcuml.neighborsimportNearestNeighborsdf=cudf.read_csv('data.csv')nn=NearestNeighbors(n_neighbors=10)nn.fit(df)neighbors=nn.kneighbors(df)运行时间从CPU上的1min52s提高到GPU上的17.8s.Mars和RAPIDS的结合能带来什么?RAPIDS将Python数据科学带到GPU中,极大地提高了数据科学的运行效率。它们是命令式的,像Numpy等。通过结合Mars,中间进程会使用更少的内存,从而使数据处理量更大;Mars还可以将计算分布到多台机器和卡片上,以提高数据规模和计算效率。Mars中使用GPU也很简单,只需要在相应的函数上指定gpu=True即可。比如创建tensor,读取CSV文件等都是适用的。importmars.tensorasmtimportmars.dataframeasmda=mt.random.uniform(-1,1,size=(1000,1000),gpu=True)df=md.read_csv('ml-20m/ratings.csv',gpu=True)的图为在Scaleup和Scaleout两个维度使用Mars加速Pi的蒙特卡洛计算。一般来说,如果我们想加速数据科学任务,有两种方法。Scaleup意味着可以使用更好的硬件,比如使用更好的CPU、更大的内存、使用GPU代替CPU等;Scaleout指的是使用更多的机器以分布式的方式提高效率。可以看到,在24核的机器上,Mars计算耗时25.8s,而在分布式的情况下,使用4台24核的机器几乎是线性提升。通过使用NVIDIATESLAV100显卡,我们可以将单机运行时间提升到3.98s,已经超越了4CPU机器的性能。通过将单卡扩展为多卡,时间进一步减少,但这里也可以看出时间很难线性扩展。这是因为GPU的运行速度大大提高了。这时候,网络和数据拷贝的开销就显现出来了。性能测试,我们使用https://github.com/h2oai/db-benchmark的数据集测试了三个data-scalegroupby和一个data-scalejoin。而我们主要对比了pandas和DASK。DASK和Mars的初衷很相似,都是在尝试并行化和分布式Python数据科学。但是,它们在设计、实现和分发方面存在许多差异。后面我们会写一篇文章进行详细的对比。测试机配置为500G内存,96核,NVIDIAV100显卡。Mars和DASK都使用RAPIDS在GPU上执行计算。Groupby数据有3种大小,分别是500M、5G和20G。查询也有三个组。查询1:df=read_csv('data.csv')df.groupby('id1').agg({'v1':'sum'})查询2:df=read_csv('data.csv')df.groupby(['id1','id2']).agg({'v1':'sum'})查询三:df=read_csv('data.csv')df.gropuby(['id6']).agg({'v1':'sum','v2':'sum','v3':'sum'})当数据大小达到20G时,pandas会在query2内存溢出,得不到结果。可以看出,随着数据的增加,Mars的性能优势会越来越明显。得益于GPU的计算能力,GPU的计算性能比CPU高数倍。如果单纯使用RAPIDScuDF,由于内存大小的限制,到了5G就很难完成数据。不过由于Mars的声明式特性,对中间过程显存的使用进行了极大的优化,所以整套测试到了20G就可以轻松完成。这正是Mars+RAPIDS所能发挥的威力。加入测试查询:x=read_csv('x.csv')y=read_csv('y.csv')x.merge(y,on='id1')测试数据x为500M,y包含10行数据.总结RAPIDS将Python数据科学带入GPU,大大提高了数据分析和处理的效率。Mars更侧重于并行和分布。相信未来两者的结合会有更大的想象空间。【本文为专栏作者《阿里巴巴官方技术》原创稿件,转载请联系原作者】点此查看作者更多好文