假设当前上下文(LCTT译注:context,计算机术语,这里指业务情况)是这样的:一个zip文件被上传到一个web服务,然后Python需要Unzipzip文件并分析和处理其中的每个文件。这个特定的应用程序查看每个文件各自的名称和大小,将其与已经上传到AWSS3的文件进行比较,如果文件不同(与AWSS3相比)或者文件本身更新,则将其更新上传到AWSS3.今天上传的挑战是这些zip文件太大。它们的平均大小为560MB,但其中一些大于1GB。这些文件大部分是文本文件,但也有一些巨大的二进制文件。不同寻常的是,每个zip文件包含100个文件,但其中1-3个文件占zip文件大小的95%。一开始我尝试解压内存中的文件,一次只处理一个文件。这种方法在各种内存爆炸和EC2内存耗尽场景下英勇失败。我认为原因是这样的。起初你有1GB的文件在内存中,然后你现在解压每个文件,大约需要2-3GB的内存。因此,经过多次测试,解决方案是将这些zip文件复制到磁盘(在临时目录/tmp中),然后遍历这些文件。这次情况好多了,但我仍然注意到整个减压过程花费了大量时间。有没有办法优化这个?原始函数首先是以下模拟压缩文件中文件实际操作的普通函数:def_count_file(fn):withopen(fn,'rb')asf:return_count_file_object(f)def_count_file_object(f):#注意这会迭代'f'。#你*可以*做'returnlen(f.read())'#这会更快但可能会占用内存#就此基准测试而言效率低下且不切实际。total=0forlineinf:total+=len(line)returntotal这是另一个可能最简单的函数:deff1(fn,dest):withopen(fn,'rb')asf:zf=zipfile.ZipFile(f)zf.extractall(dest)total=0forroot,dirs,filesinos.walk(dest):forfile_infiles:fn=os.path.join(root,file_)total+=_count_file(fn)返回总计,如果我更仔细地观察,我会发现这个函数用40%的时间运行extractall,用60%的时间遍历各种文件并读取它们的长度。第一次尝试我的第一次尝试是使用线程。首先创建一个zipfile.ZipFile实例,展开其中的每个文件名,为每个文件启动一个线程。每个线程都给它一个函数来完成“真正的工作”(在这个基准测试中,遍历每个文件并获取它的名字)。真正的业务函数执行复杂的S3、Redis和PostgreSQL操作,但在我的基准测试中,我只需要创建一个函数来找出文件的长度。线程池函数:deff2(fn,dest):defunzip_member(zf,member,dest):zf.extract(member,dest)fn=os.path.join(dest,member.filename)return_count_file(fn)withopen(fn,'rb')asf:zf=zipfile.ZipFile(f)futures=[]withconcurrent.futures.ThreadPoolExecutor()作为执行者:对于zf.infolist()中的成员:futures.append(executor.submit(unzip_member,zf,member,dest,))total=0forfutureinconcurrent.futures.as_completed(futures):total+=future.result()returntotalresult:speedup~10%secondsteptrysoprobablyGIL(LCTT译注:GlobalInterpreterLock,全局锁,CPython中的一个概念)阻碍了我。最自然的想法是尝试使用多线程在多个CPU之间分配工作。但是这样有个缺点就是不能传递一个non-pickleableserializableobject(LCTT翻译:意思是只能传递pickleableserializableobjects),所以只能把filename传给下面这个函数:defunzip_member_f3(zip_filepath,filename,dest):withopen(zip_filepath,'rb')asf:zf=zipfile.ZipFile(f)zf.extract(filename,dest)fn=os.path.join(dest,filename)return_count_file(fn)deff3(fn,dest):withopen(fn,'rb')asf:zf=zipfile.ZipFile(f)futures=[]withconcurrent.futures.ProcessPoolExecutor()asexecutor:formemberinzf.infolist():期货。append(executor.submit(unzip_member_f3,fn,member.filename,dest,))total=0forfutureinconcurrent.futures.as_completed(futures):total+=future.result()返回总结果:加速~300%使用处理器池作弊问题是这需要将原始.zip文件存储在磁盘上。因此,为了在我的Web服务器上使用此解决方案,我首先将内存中的zip文件保存到磁盘,然后调用此函数。这样做的成本我不是很清楚,但应该不低。好吧,再看一遍也没什么损失。也许,减压过程的加速足以弥补这样做的损失。但是一定要记住!这种优化取决于使用所有可用的CPU。如果其他CPU需要在gunicorn中做其他事情怎么办?此时,这些其他进程必须等到CPU可用。由于此服务器上正在进行其他事务,我不确定是否要接管进程中的所有其他CPU。结论一步一步做这个任务的过程感觉很好。你被限制在一个CPU上,但仍然表现得非常好。另外,请务必查看f1和f2代码段之间的区别!使用concurrent.futures池类,您可以获得允许使用的CPU数量,但这样做也感觉不太好。如果你在虚拟环境中得到的数字是错误的怎么办?还是可用的数量太少而无法从负载共享中获益,而现在您只是为了转移负载而支付运营支出?我会坚持使用zipfile.ZipFile(file_buffer).extractall(temp_dir)。这项工作做得很好。想尝试一下吗?我使用c5.4xlargeEC2服务器进行基准测试。文件可以从这里下载:wgethttps://www.peterbe.com/unzip-in-parallel/hack.unzip-in-parallel.pywgethttps://www.peterbe.com/unzip-in-parallel/symbols-2017-11-27T14_15_30.zip此处的.zip文件为34MB。它比在服务器上小得多。hack.unzip-in-parallel.py文件一团糟。它包含许多可怕的修复和丑陋的代码,但这仅仅是个开始。
