当前位置: 首页 > 后端技术 > Python

注:HighPerformancePython第9章

时间:2023-03-25 20:24:41 Python

是关于并行计算的,占了60多页。看完了,我得整理消化一下这一大堆东西。1.用蒙特卡洛模拟估计pi。逻辑很简单:在坐标系的单位正方形内扔针,计算针落在1/4单位圆内的比例(x^2+y^2<=1),乘以4即可执行按顺序,投掷100,000,000次大约需要120秒。1.1使用多进程加速每个throw都是相互独立的,所以只需将工作负载分成多个部分(比如8个部分)交给多个进程执行即可。可以想象,这里运行的函数不需要与程序的其他部分共享状态,只需要在进程间传递少量的数据就可以完成大量的计算。函数estimate_nbr_points_in_quarter_circle重复抛针,计算落在单位圆1/4以内的针数:defestimate_points(nbr_estimates):in_unit_circle=0forstepinrange(int(nbr_estimates)):x=random.uniform(0,1)y=random.uniform(0,1)is_in_unit_circle=x*x+y*y<=1.0in_unit_circle+=is_in_unit_circle返回in_unit_circle并行部分:来自多处理导入池...samples_total=1e8N=8pool=Pool(processes=N)samples=samples_total/Ntrials=[samples]*Nt1=time.time()nbr_in_unit_circles=pool.map(estimate_points,trials)pi_estimate=sum(nbr_in_unit_circles)*4/samples_totalprint("估计pi",pi_estimate)print("Delta:",time.time()-t1)这是一个编写起来相当简单的并行程序。确定要使用的进程数(通常是cpus数),在trials列表中设置每个进程的参数,然后丢到pool.map中,就像使用普通的map函数一样。运行它大约需要19秒。(总之,书上这段代码非常难读,因为变量名太长,到处都是nbr_trials_in_quarter_unit_circle、nbr_trials_per_process、nbr_samples_in_total等三四个字的变量名,把很简单的东西搞大了一段长让人头晕。这个故事告诉我们,代码要简洁,信息密度要高。)EffectivePython第41条指出,multiprocessing的开销比较高,因为必须在主进程和子进程之间进行序列化和反序列化操作。具体来说,多处理会做这些事情:将试验列表中的每一项数据传递给地图。使用pickle模块序列化数据,将其转换为二进制形式。序列化后的数据通过本地socket从主解释器所在进程发送到子解释器所在进程。使用pickle反序列化子进程中的二进制数据,还原为python对象。导入包含estimate_points函数的python模块。每个子进程在其自己的输入数据上并行运行estimate_points函数。对运行结果进行序列化操作,转化为字节。通过套接字将这些字节复制到主进程。主进程将这些字节反序列化回python对象。将各个子流程得到的结算结果合并成一个列表返回给调用方。1.2并行系统中的随机数在并行计算中,你必须仔细考虑是否会得到重复或相关的随机序列。如果使用python自带的random模块,multiprocessing会在每次fork进程中重新设置随机数生成器的seed。但是如果你用的是numpy,就得自己重新设置,否则random会返回相同的序列。使用numpy:importnumpyasnpdefestimate_points(samples):np.random.seed()xs=np.random.uniform(0,1,samples)ys=np.random.uniform(0,1,samples)is_in_quc=(xs*xs+ys*ys)<=1.0in_quc=np.sum(is_in_quc)使用numpy返回in_quc将运行时间减少到1.13秒。numpy太强了(或者CPython太好了)。2.寻找素数在大范围内寻找素数与估算pi不同,因为工作量与搜索范围的上下限有关(查看[10,100]和[10000,100000]的工作量)肯定是不同的),而且每个数的校验复杂度也不一样(谁知道校验第一个质因数的时候就可以整除?偶数最容易校验,质数最难校验)。这个问题非常平行(不会翻译......),即没有需要共享的状态。关键在于如何平衡进程间的工作负载(loadbalancing),将不同复杂度的任务分配给有限的计算资源。当我们将计算任务分配给进程池时,我们可以控制分配给每个进程的工作负载量,将工作负载分成块,一旦有CPU空闲就分配工作给它。块越大,通信开销越小;块越小,控制越精细。(块大小为10意味着进程一次检查10个数字)。作者给出了一个“blocknumber-runningtime”关系图来说明“block数是cpu数的倍数时,运行时间最短”的简单道理(否则会出现cpu空)。我们可以使用队列向一组工作人员提供任务并收集结果:==ALL_DONE:definites.put(WORKER_FINISHED)breakelse:ifn%2==0:继续foriinrange(3,int(math.sqrt(n))+1,2):ifn%i==0:breakelse:definites.put(n)possibles和definites是结果输入输出的两个队列。我们设置了两个标志位(flag),ALL_DONE作为终止循环的哨兵,是父进程把数字塞进可能后提供的,用来告诉子进程没有其他工作了。子进程收到ALL_DONE后,输出WORKER_FINISHED给defines,告诉父进程收到sentinel,然后终止从possibles队列获取输入。创建输入输出队列和8个进程,在possibles队列中加入数字,在末尾加入8个ALL_DONE哨兵:if__name__=='__main__':primes=[]possibles=Queue()definites=Queue()N=8pool=Pool(processes=N)processes=[]for_inrange(N):p=Process(target=check_prime,args=(possibles,defines))processes.append(p)p.start()t1=时间.time()number_range=range(10000000000,10000100000)forpossibleinnumber_range:possibles.put(possible)print("ALLJOBSADDEDTOTHEQUEUE")#添加毒丸以停止远程工作者forninrange(N):possibles.put(ALL_DONE)print("NOWWAITINGFORRESULTS...")...循环从definites队列中获取结果(当然,结果不是连续的),在获取到8个WORKER_FINISHED后停止循环:...processors_finished=0whileTrue:new_result=definites.get()ifnew_result==WORKER_FINISHED:processors_finished+=1print("{}WORKER(S)FINISHED".format(processors_finished))如果processors_finished==N:breakelse:primes.append(new_result)assertprocessors_finished==Nprint("Took:",time.time()-t1)print(len(primes),primes[:10],primes[-10:])程序执行耗时7秒多,顺序执行耗时20秒左右。但是因为队列的创建需要序列化和同步,所以多进程的执行速度不一定比顺序执行快。原著中是这样的。即使作者从输入队列中移除所有偶数,多进程执行仍然比顺序执行慢,说明多进程执行程序的很大一部分时间都花在了通信开销上。3.验证素数不同于第2节中的“找出一个范围内的所有素数”,现在我们来解决如何快速判断一个特别大的数(比如18位数字)是否为素数的问题——通过多个CPU协作。这是一个需要进程间通信或共享状态的问题。3.1简单进程池与前面两个例子类似,我们将待查号码的可能因素分成多组,传递给多个子进程进行查验。当其中一个子进程中的因子除以该数字时,子进程返回False-但这不会停止其他子进程(因此是原始版本)。这可能允许其他子进程做无用的工作,但它也节省了检查共享状态的通信开销。把因子分组:defcreate_range(from_i,to_i,N):piece_length=int((to_i-from_i)/N)lrs=[from_i]+[(i+1)if(i%2==0)elseiforiinrange(from_i,to_i,piece_length)[1:]]iflen(lrs)>N:lrs.pop()断言len(lrs)==N范围=list(zip(lrs,lrs[1:]))+[(lrs[-1],to_i)]返回rangese.g。create_range(1000,100000,4)的返回值为[(1000,25751),(25751,50501),(50501,75251),(75251,100000)]。importtimeimportmathfrommultiprocessingimportPooldefcheck_prime_in_range(args):n,from_i,to_i=args#好像只有传入元组然后解包from_i,to_i=rangesifn%2=才能达到传入多个参数的效果=0:对于范围内的i返回False(from_i,to_i,2):如果n%i==0:返回FalsereturnTruedefcheck_prime(n,pool,N):from_i=3to_i=int(math.sqrt(n))+1范围=create_range(from_i,to_i,N)args=[(n,from_i,to_i)forfrom_i,to_iinranges]`results=pool.map(check_prime_in_range,args)如果结果为假:返回False返回Trueif__name__=="__main__":N=8pool=Pool(processes=N)prime18=100109100129100151t1=time.time()print("%d:%s"%(prime18,check_prime(prime18,pool,N)))print('Took:',time.time()-t1)花费了大约10秒。3.2稍微不太简单的进程池由于额外的开销,对于较小的数字,多进程方法可能不如顺序搜索方法。此外,如果发现一个小因素,程序不会立即停止。当然,我们可以在进行因式分解时立即在进程之间进行通信,但这会产生大量额外的通信开销,因为大多数数字都会有一个小因数。所以我们采用混合策略:先依次找到较小的因子,然后将剩余的工作分配给多个进程。这是避免多处理开销的常见做法。defcheck_prime(n,pool,N):from_i=3to_i=21args=(n,from_i,to_i)如果不是check_prime_in_range(args):返回Falsefrom_i=to_ito_i=int(math.sqrt(n))+1ranges=create_range(from_i,to_i,N)args=[(n,from_i,to_i)forfrom_i,to_iinranges]results=pool.map(check_prime_in_range,args)如果结果为False:返回False使用多处理返回True3.3使用.Manager()作为标志并直接添加代码。您可以看到使用Manager创建了一个符号位。读取这个符号位不需要任何加锁等操作,就像查看一个全局变量一样方便(但仍然需要作为参数传入函数)。为了节省通信开销,让每个进程每1000个数检查一次符号位。如果进程检查FLAG_SET或找到一个因素,它就会停止。importtimeimportmathfrommultiprocessing导入池,ManagerSERIAL_CHECK_CUTOFF=21CHECK_EVERY=1000FLAG_CLEAR=b'0'FLAG_SET=b'1'defcreate_range(from_i,to_i,N):piece_length=int((to_i-from_i)/N)lrs=[from_i]+[(i+1)if(i%2==0)elseiforiinrange(from_i,to_i,piece_length)[1:]]iflen(lrs)>N:lrs.pop()assertlen(lrs)==Nranges=list(zip(lrs,lrs[1:]))+[(lrs[-1],to_i)]returnrangesdefcheck_prime_in_range(args):n,from_i,to_i,value=argsifn%2==0:返回假check_every=CHECK_EVERYforiinrange(from_i,to_i,2):check_every-=1ifnotcheck_every:ifvalue.value==FLAG_SET:returnFalsecheck_every=CHECK_EVERYifn%i==0:value.value=FLAG_SETreturnFalsereturnTruedefcheck_prime(n,pool,N,value):from_i=3to_i=SERIAL_CHECK_CUTOFF值。value=FLAG_CLEAR#记得先设置flag的值args=(n,from_i,to_i,value)ifnotcheck_prime_in_range(args):returnFalsefrom_i=to_ito_i=int(math.sqrt(n))+1ranges=create_range(from_i,to_i,N)args=[(n,from_i,to_i,value)forfrom_i,to_iinranges]结果=池。map(check_prime_in_range,args)ifFalseinresults:returnFalsereturnTrueif__name__=="__main__":N=8manager=Manager()value=manager.Value(b'c',FLAG_CLEAR)#创建一个单字节(一个字符)符号标志池=池(进程=N)prime18=100109100129100151non_prime=100109100129101027t1=time.time()print("%d:%s"%(non_prime,check_prime(non_prime,pool,N,value)))print('Tok:',time.time()-t1)