CDADataAnalyst出品相信大家在做一些算法的时候,经常会被庞大的计算量带来的巨大的计算量所需要的时间所折磨数据的。接下来,我们将重点介绍四种方法,帮助您加快Python的计算时间,减少算法的等待时间。下面我给大家讲解一下数据并行化的内容。1.简介随着时间和处理器计算能力的增长,数据呈指数级增长,我们需要找到高效处理数据的方法。那我们该怎么办呢?GPU是一种非常有效的解决方案。然而,GPU并不是为机器学习而设计的,它是专门为复杂的图像处理和游戏而设计的。我们使该算法能够在现有GPU上运行,并且它确实提供了结果。现在,谷歌推出了一款名为TPU(TensorProcessingUnit)的新设备,它是为TensorFlow上的机器学习工作量身定做的,结果确实令人兴奋。与此同时,英伟达在这方面也没有退缩。但我们会在未来的某个时刻达到顶峰。即使我们现在拥有大量可用的数据集,单台机器或计算单元也不足以处理这样的负载。我们将不得不使用多台机器来完成我们的任务。我们将不得不并行化我们的任务。接下来,我们将了解您在Python中最常使用的一些方法。然后介绍Dask和torch.multiprocessing。2.Pool和ProcessPython库的Pool和Process方法都来自multiprocessing,它为我们的任务启动了一个新的进程,但方式不同。进程每次调用只执行一个进程:importmultiprocessingasmpp=mp.Process(target=##targetfunction,args=##parameterstofunction)#Thiscallwillonlyspawnoneprocess,whichwillhandletheprocessinginthebackgroundfor使用给定的参数处理目标函数,但该过程尚未开始。要启动它,您必须执行以下操作:p.start()现在,您可以将其留在这里或检查进程是否完成:p.join()#现在它将等待进程完成。不检查进程是否已完成有很多用途。例如,在客户端-服务器应用程序中,丢包或进程无响应的概率非常低,我们可以忽略它,这可以给我们带来巨大的速度提升。【视应用程序而定】对于多进程,必须创建多个Processes。你想做多少就做多少。当您调用.start()它们时,它们都会启动。processes=[mp.Process(target=func,args=(a,b))for(a,b)inlist]forpinprocesses:p.start()forpinprocesses:p.join()anotherOn另一方面,Pool启动固定数量的进程,然后我们可以给这些进程分配一些任务。因此,在特定的时间实例中,只有固定数量的进程在运行,其余进程将处于等待状态。通常选择进程数作为设备的核心数。如果该参数为空,也可以作为默认状态。pool=mp.Pool(processes=2)现在有很多方法可以应用于Pool。在数据科学中,我们可以避免的是Pool.apply和Pool.map,因为它们在任务完成后立即返回结果。Pool.apply只接受一个参数并只使用一个过程,而Pool.map将接受许多参数并将它们放入我们的Pool过程中。results=[pool.apply(func,(x))forxinX]#或results=pool.map(func,(arg))#考虑到我们之前的客户端-服务器应用程序示例,只需要一个参数,这里是最大数量要运行的进程数是预定义的,因此如果我们有很多请求/数据包,n(仅Pool中的最大进程数)将运行一次,而其他进程将等待队列中的进程槽之一。向量所有元素的平方我们如何使用数据框#A:你可以使用一些可以并行化的函数df.shape#(100,100)dfs=[df.iloc[i25:i25+25,0]foriinrange(4)]withPool(4)asp:res=p.map(np.exp,dfs)foriinrange(4):df.iloc[i25:i25+25,0]=res[i]#可以方便的对数据进行预处理,什么时候用?如果您有很多任务,但其中很少有计算密集型任务,您应该使用Process。因为如果它们需要大量的计算,它们可能会阻塞你的CPU,你的系统可能会崩溃。如果您的系统可以一次处理所有这些操作,那么它们就不必在队列中等待机会。当您有固定数量的任务并且它们是计算密集型时,应该使用Pool。由于您同时释放它们,那么您的系统很可能会崩溃。3.线程处理什么的!在python中线程化?python中的线程声誉。人们对此是正确的。事实上,线程在大多数情况下是没有用的。那么问题是什么?问题是GIL(全局解释器锁)。GIL是在Python发展的早期引入的,当时操作系统中甚至还没有线程的概念。选择它是因为它的简单性。GIL一次只允许一个CPU进程。也就是说,它一次只允许一个线程访问python解释器。因此,一个线程会锁定整个解释器直到它完成。对于单线程程序来说,速度非常快,因为只需要维护一个Lock。随着Python越来越受欢迎,在不损害所有相关应用程序的情况下有效推出GIL变得越来越困难。这就是它仍然存在的原因。但是,如果您的任务不受CPU限制,您仍然可以使用多线程并行性(y)。也就是说,如果您的任务受I/O限制,您可以使用多线程并获得加速。因为大部分时间这些任务都在等待其他代理(如磁盘等)的响应,并且在这段时间内他们可以释放锁并让其他任务同时获得它。?注意:(来自官方页面)GIL是有争议的,因为它阻止多线程CPython程序在某些情况下充分利用多处理器系统。请注意,可能会阻塞或长时间运行的操作(例如I/O、图像处理和NumPy数字运算)发生在GIL之外。因此,只有在GIL中花费大量时间解释CPython字节码的多线程程序中,GIL才会成为瓶颈。以下是官方网页的解释:GIL是有争议的,因为它在某些情况下会阻止多线程的CPython程序充分利用多处理器系统。请注意,可能会阻塞或长时间运行的操作(例如I/O、图像处理和NumPy数字运算)发生在GIL之外。因此,GIL只会成为多线程程序的瓶颈,这些程序会花费大量时间在GIL内部解释CPython字节码。因此,如果您的任务是IO绑定的,例如从服务器下载一些数据、对磁盘进行读/写等,您可以使用多线程并获得加速。fromthreadingimportThreadastimportqueueq=queue.Queue()#用于放置和获取线程结果func_=lambdaq,args:q.put(func(args))threads=[t(target=func_,args=(q,args))forargsinargs_array]fortinthreads:t.start()fortinthreads:t.join()res=[]fortinthreads:res.append(q.get())#这些results不一定是为了保存线程的结果,可以使用类似Queue的方法。为此,您必须如上所示定义函数,或者您可以在函数内部使用Queue.put(),但为此,您必须更改函数定义以将Queue`作为参数。现在,队列中的结果不一定按顺序排列。如果希望结果是有序的,可以传入一些计数器作为参数,比如ids作为参数,然后通过这些id来标识结果的来源。threads=[t(func_,args=(i,q,args))fori,argsinenumerate(args_array)]#并相应地更新函数注意:由于某种原因,在pandas的多处理中,“read.csv”方法没有提供很大的加速,你可以考虑使用Dask作为替代线程或进程吗?一个进程是重量级的,因为它可能包含很多自己的线程(至少包含一个线程),并且分配自己的内存空间,而一个线程是轻量级的,因为它工作在父进程的内存区域,所以速度更快制作。进程内线程之间的通信更容易,因为它们共享相同的内存空间。进程间通信(IPC——Inter-ProcessCommunication)速度较慢。但是,共享相同数据的线程可能会再次进入竞争状态,应谨慎使用Locks或类似的解决方案。4.DaskDask是一个并行计算库,不仅可以帮助并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),还可以帮助并行化低级任务/功能,可以通过以下方式实现制作任务图来处理这些功能之间的复杂交互。[IE。使用低级调度程序]这类似于Python的线程或多处理模块。他们还有一个单独的机器学习库dask-ml,它集成了现有的库,例如sklearn、xgboost和tensorflow。fromdaskimportdelayedasdelay@delaydefadd(x,y):returnx+y@delaydefsq(x):returnx**2#现在你可以以任何方式使用这些函数,Dask将并行化你的执行。顾名思义,Dask不会立即执行函数调用,而是根据函数在输入和中间结果上的调用方式生成计算图。计算最终结果:result.compute()Dask在做任何事情时都具有内在的并行性。对于如何处理DataFrame,可以认为是分而治之,将DataFrame分成chunk,然后并行应用给定的函数。df=dask.DataFrame.read_csv("BigFile.csv",chunks=50000)#你的DataFrame被分成了多个chunk,你应用的每个函数都会并行应用所有模块。它具有大部分Pandas功能,您可以使用:agg=df.groupby(["column"]).aggregate(["sum","mean"])agg.columns=new_column_namesdf_new=df.merge(agg.reset_index(),on="column",how="left")#虽然目前还没有计算结果,但是可以使用.compute()进行并行计算。df_new.compute().head()它们还有一个用于在计算机集群上运行它们的接口。5、torch.multiprocessingtorch.multiprocessing是Pythonmultiprocessing模块的封装函数,其API与原模块100%兼容。因此,您可以在此处使用Python的多处理模块中的Queue'、Pipe'、Array'等。此外,为了使其更快,他们添加了一个方法share_memory_(),它允许数据进入任何进程都可以直接使用它的状态,因此将该数据作为参数传递给不同的进程不会复制该数据。.您可以共享Tensors、模型参数,并根据需要在CPU或GPU上共享。来自Pytorch的警告:(关于GPU上的共享)CUDAAPI要求导出到其他进程的分配在被其他进程使用时保持有效。您应该小心确保共享的CUDA张量在必要时不会越界。这对于共享模型参数应该不是问题,但在传递其他类型的数据时应该小心。请注意,此限制不适用于共享CPU内存。您可以在此处的“PoolandProcess”部分使用上述方法,为了更快,您可以使用share_memory_()方法在所有进程之间共享一个Tensor(例如),而无需复制。使用多进程训练模型:importtorch.multiprocessingasmpdeftrain(model):fordata,labelsindata_loader:optimizer.zero_grad()loss_fn(model(data),labels).backward()optimizer.step()#这将更新共享参数model=nn.Sequential(nn.Linear(n_in,n_h1),nn.ReLU(),nn.Linear(n_h1,n_out))model.share_memory()#requires“fork”方法来工作进程=[]foriinrange(4):#NO.的进程p=mp.Process(target=train,args=(model,))p.start()processes.append(p)forpinprocesses:p.join()下一期继续看Python算法提速的第四种方法——Dask!疫情当下,往日匆忙的脚步终于慢下来,是时候好好想想自己的职业规划和人生规划了。未雨绸缪,未雨绸缪,为未来积蓄能量——蓄势待发!(1)更多优质内容和精彩资讯,访问:https://www.cda.cn/?seo(2)搜索CDA小程序,随时随地浏览最新资讯和优质课程手机:
