前言在高性能计算项目中,我们通常会使用效率更高的编译型语言,如C、C++、Fortran等,但由于灵活性和易用性Python的使用其独特性使其在算法的开发和验证中大受欢迎,因此在高性能计算领域也经常能看到Python的身影。本文简单介绍在Python环境下使用MPI接口对集群进行多进程并行计算的方法。MPI(MessagePassingInterface)这里我简单介绍一下MPI。MPI的全称是MessagePassingInterface,即消息传递接口。它不是一种语言,而是一个图书馆。我们可以使用Fortran、C、C++结合MPI提供的接口对串行程序进行并行化处理。我们也可以认为Fortran+MPI或者C+MPI是一种在原有串行语言的基础上重新扩展的并行语言。它是一个标准而不是一个特定的实现。可以有很多种不同的实现,比如MPICH、OpenMPI等。它是一种消息传递编程模型,顾名思义,它专门用于进程间通信。MPI的工作方式很好理解。我们可以同时启动一组进程。同一个通信域中的不同进程有不同的编号。程序员可以使用MPI提供的接口,将不同的任务和进程分配给不同的编号。辅助进程相互通信并最终完成相同的任务。就像承包商给工人分配工作编号,然后指定一个计划,将任务分配给不同编号的工人,让工人相互沟通来完成任务。Python中的并行由于CPython中GIL的存在,我们暂时不能指望在CPython中使用多线程来使用多核资源进行并行计算,所以我们可以通过使用来充分利用Python中的多核资源多进程方法。在Python中,我们可以使用很多方法进行多进程编程,比如os.fork()创建进程或者使用multiprocessing模块更方便的创建进程和进程池。在上一篇《Python多进程并行编程实践-multiprocessing模块》中,我们使用进程池方便的管理Python进程,通过multiprocessing模块中的Manager管理分布式进程,实现多机分布式计算。与多线程共享内存不同,由于每个进程都是相互独立的,因此进程间通信在多进程中起着非常重要的作用。在Python中,我们可以使用multiprocessing模块中的pipe、queue、Array、Value等工具来实现进程间通信和数据共享,但是在编写上还是有很多不灵活的地方。而这方面正是MPI所擅长的,所以如果你能在Python中调用MPI接口,那就太棒了,不是吗。MPI和mpi4pympi4py是一个建立在MPI之上的Python库,主要用Cython编写。mpi4py使得跨多个进程传递Python数据结构变得容易。mpi4py是一个非常强大的库,它实现了MPI标准中的很多接口,包括点对点通信、组内集体通信、非阻塞通信、重复非阻塞通信、组间通信等。基本上,能想到使用的MPI接口mpi4py中有对应的实现。不仅是Python对象,mpi4py对numpy也有很好的支持,传输效率非常高。同时,它还提供了SWIG和F2PY接口,让我们可以将自己的Fortran或C/C++程序打包成Python,仍然使用mpi4py对象和接口进行并行处理。可见mpi4py作者的功力确实非常的好。mpi4py在这里,我开始介绍在Python环境中使用mpi4py接口进行并行编程。MPI环境管理mpi4py提供了相应的接口Init()和Finalize()来初始化和结束mpi环境。但是mpi4py已经把初始化操作写在了__init__.py中,所以我们在frommpi4pyimportMPI的时候已经自动初始化了mpi环境。MPI_Finalize()被注册到Python的C接口Py_AtExit()中,这样当Python进程结束时MPI_Finalize()就会被自动调用,所以我们不再需要显式地去掉Finalize()。通信域(Communicator)mpi4py直接提供了对应通信域的Python类,其中Comm是通信域的基类,Intracomm和Intercomm是它的派生类,在MPI的C++实现中是一样的。同时,它还提供了两个预定义的通信域对象:COMM_WORLDcontainingallprocessesCOMM_SELFIn[1]:frommpi4pyimportMPIIn[2]:MPI.COMM_SELFOut[2]:Incontainingonlythecallingprocessitself[3]:MPI.COMM_WORLDOut[3]:通讯域对象提供通讯域相关接口,如获取当前进程号,获取通讯域进程数,获取进程group,Groups进行集合操作,拆分合并等。In[4]:comm=MPI.COMM_WORLDIn[5]:comm.Get_rank()Out[5]:0In[6]:comm.Get_size()Out[6]:1In[7]:comm.Get_group()Out[7]:In[9]:comm.Split(0,0)Out[9]:通信域和进程组的操作这里就不细说了,可以参考GroupsandCommunicators点对点通信介绍mpi4py提供点对点通信接口实现多个进程传递Python内置对象(基于pickle序列化),也提供直接数组传递(numpy数组,接近C语言效率)。如果我们需要传递一个通用的Python对象,需要在通信领域对象的方法中使用小写接口,如send()、recv()、isend()等,如果需要直接传递数据对象,需要调用大写接口,如Send()、Recv()、Isend()等,与C++接口中的拼写相同。MPI中有多种点对点通信,包括标准通信、缓冲通信、同步通信和就绪通信。同时,这些通信有非阻塞的异步版本等等。这些在mpi4py中都有对应的Python版本接口,可以让我们更灵活的处理进程间通信。这里我只以标准通信的阻塞和非阻塞版本为例:Blockingstandardcommunication这里我尝试使用mpi4py接口在两个进程之间传递Python列表对象。frommpi4pyimportMPIimportnumpyasnpcomm=MPI.COMM_WORLDrank=comm.Get_rank()size=comm.Get_size()ifrank==0:data=range(10)comm.send(data,dest=1,tag=11)print("处理{}发送{}...".format(rank,data))else:data=comm.recv(source=0,tag=11)print("process{}recv{}...".format(rank,data))执行效果:zjshao@vaio:~/temp_codes/mpipy$mpiexec-np2pythontemp.pyprocess0send[0,1,2,3,4,5,6,7,8,9]...process1recv[0,1,2,3,4,5,6,7,8,9]...非阻塞标准通信所有阻塞通信mpi都提供了非阻塞版本,类似于我们写异步程序的方式,不用阻塞在耗时IO上它同样,MPI的非阻塞通信不会阻塞消息传递的过程,从而可以充分利用处理器资源,提高整个程序的效率。我们来看看阻塞通信和非阻塞通信的比较:非阻塞通信的消息发送和接收:同样的,我们也可以把上面的例子写成非阻塞版本。frommpi4pyimportMPIimportnumpyasnpcomm=MPI.COMM_WORLDrank=comm.Get_rank()size=comm.Get_size()ifrank==0:data=range(10)comm.isend(data,dest=1,tag=11)print("处理{}立即发送{}...".format(rank,data))else:data=comm.recv(source=0,tag=11)print("process{}recv{}...".format(rank,data))执行结果,注意非阻塞发送也可以使用阻塞接收来接收消息:zjshao@vaio:~/temp_codes/mpipy$mpiexec-np2pythontemp.pyprocess0immediatesend[0,1,2,3,4,5,6,7,8,9]...process1recv[0,1,2,3,4,5,6,7,8,9]...支持Numpy数组mpi4py的一个很好的特性是它可以很好地与Numpy数组支持,我们可以通过它提供的接口直接传递数据对象,这种方法效率高,和C/Fortran直接调用MPI接口基本一样(方法及效果)比如我想传递一个长度为10、MPI的C++接口是:voidComm::Send(constvoid*buf,intcount,constDatatype&datatype,intdest,inttag)const也是与mpi4py的接口类似,Comm.Send()需要接收一个Python列表作为参数,其中包含发送数据的地址、长度和类型。下面是一个阻塞标准通信的例子:frommpi4pyimportMPIimportnumpyasnpcomm=MPI.COMM_WORLDrank=comm.Get_rank()size=comm.Get_size()ifrank==0:data=np.arange(10,dtype='i')comm.Send([data,MPI.INT],dest=1,tag=11)print("process{}Sendbuffer-likearray{}...".format(rank,data))else:data=np.empty(10,dtype='i')comm.Recv([data,MPI.INT],source=0,tag=11)print("process{}recvbuffer-likearray{}...".format(rank,data))执行效果:zjshao@vaio:~/temp_codes/mpipy$/usr/bin/mpiexec-np2pythontemp.pyprocess0Sendbuffer-likearray[0123456789]...process1recvbuffer-likearray[0123456789]...一个重要的组通信MPI组通信和point-to-点通信不同的是一个进程组中的所有进程同时参与通信。mpi4py提供了一个方便的接口,让我们可以用Python完成群组通信,方便了编程,提高了程序的可读性和可移植性。让我们尝试一些常用的设置通信。Broadcast广播操作是典型的一对多通信,将后面进程的数据复制给同组的所有其他进程。在Python中,我想向其他进程广播一个列表:frommpi4pyimportMPIcomm=MPI.COMM_WORLDrank=comm.Get_rank()size=comm.Get_size()ifrank==0:data=range(10)print("process{}bcastdata{}tootherprocesses".format(rank,data))else:data=Nonedata=comm.bcast(data,root=0)print("process{}recvdata{}...".format(rank,data))执行结果:zjshao@vaio:~/temp_codes/mpipy$/usr/bin/mpiexec-np5pythontemp.pyprocess0bcastdata[0,1,2,3,4,5,6,7,8,9]tootherprocessesprocess0recvdata[0,1,2,3,4,5,6,7,8,9]...process1recvdata[0,1,2,3,4,5,6,7,8,9]...process3recvdata[0,1,2,3,4,5,6,7,8,9]...process2recvdata[0,1,2,3,4,5,6,7,8,9]...process4recvdata[0,1,2,3,4,5,6,7,8,9]...发散不同于广播,发散可以将不同的数据发送到不同的进程,而不是完整的拷贝。例如,我想将0-9发送到不同的进程:frommpi4pyimportMPIimportnumpyasnpcomm=MPI.COMM_WORLDrank=comm.Get_rank()size=comm.Get_size()recv_data=Noneifrank==0:send_data=range(10)print("process{}scatterdata{}tootherprocesses".format(rank,send_data))else:send_data=Nonerecv_data=comm.scatter(send_data,root=0)print("process{}recvdata{}...".format(rank,recv_data))发散结果:zjshao@vaio:~/temp_codes/mpipy$/usr/bin/mpiexec-np10pythontemp.pyprocess0scatterdata[0,1,2,3,4,5,6,7,8,9]tootherprocessesprocess0recvdata0...process3recvdata3...process5recvdata5...process8recvdata8...process2recvdata2...process7recvdata7...process4recvdata4...process1recvdata1...process9recvdata9...process6recvdata6...Collect采集过程是发散过程的逆过程,每个过程将发送缓冲区中的消息发送给根进程,根进程根据sendi的进程号将各自的消息存储在自己的消息缓冲区中ng过程。采集结果:zjshao@vaio:~/temp_codes/mpipy$/usr/bin/mpiexec-np5pythontemp.pyprocess2senddata2toroot...process3senddata3toroot...process0senddata0toroot...process4senddata4toroot...process1senddata1toroot...process0gatheralldata[0,1,2,3,4]...其他组内通信和归约操作等,限于篇幅不再赘述。有兴趣的可以去看MPI的官方文档和相应的教材。Mpi4py并行编程实践这里我将对上一篇文章《Python 多进程并行编程实践: multiprocessing 模块》中的双循环绘制地图的例子使用mpi4py进行并行加速处理。我打算同时启动10个进程,将每个0轴需要计算绘制的数据发送给不同的进程进行并行计算。所以我需要将pO2s数组发散到10个过程:然后我需要执行pCOs循环以根据接收到的pO2s数据在每个过程中进行计算。最后收集一下各个进程计算出来的结果(TOF):comm.gather(tofs_1d,root=0)由于代码都是专业相关的东西,就不一一列举了。将mpi4py修改后的并行版本放到Executionin10processes中可见:效率提升了10倍左右。小结本文简单介绍了使用mpi4py的接口在python中进行多进程编程的方法。MPI的接口很大,对应的mpi4py也很大。Mpi4py还实现了相应的SWIG和F2PY包文件和类型映射,这可以帮助我们在消息传递方面将Python与真正的C/C++和Fortran程序统一起来。有兴趣的同学可以进一步研究,欢迎交流。参考MPIforPython2.0.0文档MPITutorialAPythonIntroductiontoParallelProgrammingwithMPI《高性能计算并行编程技术-MPI并行程序设计》《MPI并行程序设计实例教程》