原文转载自《刘越的技术博客》https://v3u.cn/a_id_221一般情况下,大家可以清楚的解释Python原生的并发/并行工作方式:进程、线程、协程的关系和区别。连具体的对象名和内置方法都可以数出来,这显然是极好的,但我们其实忽略了一个问题,就是具体的应用场景。使用三者的目的是一样的。也就是说,结果是一样的,都可以提高程序运行的效率,但是哪种方式哪种场景更好呢?这就好比,汽车发动机的主流变速箱不外乎三种:双离合、CVT和传统AT。原始设备制造商将它们安装在不同的引擎和模型上。都是变速箱,可以将发动机产生的动力施加到车轮上,但是在不同的使用场景下应该选择哪种变速箱呢?这显然也是一个问题。所谓“没有场景,就没有功能”,本次我们将讨论具体的并发编程场景有哪些,以及对应具体的场景,如何选择并发的手段和方法。什么是并发和并行?在讨论场景之前,我们需要对多任务的执行方式进行分类,即并发模式和并行模式。教科书上告诉我们,并行是指两个或多个事件同时发生;而并发意味着两个或多个事件在同一时间间隔内发生。在多程序环境下,并发是指一段时间内多个程序在宏观上同时运行,但是在单处理器系统中,一次只能执行一个程序,所以这些程序只能分时交替执行。好像有点抽象,好吧,还是务实点,因为GIL全局解释器锁的存在,在Python编程领域,我们可以简单粗暴的通过是否可以使用多线程来区分并发和并行程序coreCPUs,andcanuse多核CPU是并行的,不能使用多核CPU,只能单核处理并发。就这么简单,没错,Python的GIL全局解释器锁帮我们简化了问题,这就是Python的大幸吗?还是倒霉?Python中并发任务的实现方式有:多线程threading和coroutineasyncio。它们的共同点是交替执行。不同的是,多线程threading是抢占式的,而coroutineasyncio是协作式的。原理也很简单。只能使用一个CPU,一个CPU一次只能做一件事,所以并发任务只能通过不断切换来完成。Python中并行任务的实现是多处理。通过multiprocessing库,Python可以在程序的主进程中创建新的子进程。这里的一个进程可以被认为是一个几乎完全不同的程序,尽管从技术上讲它们通常被定义为资源的集合,其中的资源包括内存、文件句柄等。换句话说,每个子进程都有自己的Python解释器。因此,Python中的并行任务可以使用多个CPU,每个CPU可以运行一个进程,这是真正意义上的同时运行。需要一个开关,以便Python可以执行并行任务。什么时候使用并发?IO密集型任务现在明白了,Python中并发运行的方式就是多线程threading和coroutineasyncio,那么它们用在什么场景下呢?一般来说,任务场景,或者更准确的说,任务类型,无非就是两种:CPU密集型任务和IO密集型任务。什么是IO密集型任务?IO是Input-Output的缩写。说白了,就是程序的输入输出。仔细想想,的确如此。你的电脑不就是这两个功能吗?使用键盘、麦克风和摄像头来输入数据,然后使用屏幕和扬声器进行输出操作。但是输入和输出操作比计算机中的CPU慢。换句话说,CPU必须等待这些较慢的输入和输出操作。运算完成后,CPU可以继续运算一段时间,然后等待IO操作,如图:由此可以看出,并发适合这种IO密集且频繁的work,因为即使CPU是苹果最新ARM架构的M2芯片,也没有用。另外,IO密集型任务形象化的话,就是我们经常操作的:硬盘读写(数据库读写),网络请求,文件打印等等。并发模式的选择:多线程threading还是coroutineasyncio?既然涉及硬盘读写(数据库读写)、网络请求、文件打印等任务都属于并发任务,那我们就真正实践一下,看看不同的并发方式能提高多少效率?一个简单的小请求,重复获取本站数据,计算首页数据文本行数:importrequestsimporttimedefdownload_site(url,session):withsession.get(url)asresponse:print(f"已下载{len(response.content)}行数据")defdownload_all_sites(sites):withrequests.Session()assession:forurlinsites:download_site(url,session)if__name__=="__main__":sites=["https://v3u.cn"]*50start_time=time.time()download_all_sites(sites)duration=time.time()-start_timeprint(f"下载{len(sites)}次,执行{duration}seconds")不使用任何并发手段,程序返回:76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据已下载76347行数据已下载76347行数据已下载76347行数据已下载76347行数据d76347行数据已下载76347行数据已下载76347行数据已下载Downloaded76347rowsofdataDownloaded76347rowsofdataDownloaded76347rowsofdata76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载76347行数据下载50次数据,执行for8.781155824661255seconds[Finishedin9.6s]这里程序的每一步都是同步操作,th也就是说第一次抓取网站首页的时候,剩下的49次都在等待然后使用多线程threading改造程序:Session()returnthread_local.sessiondefdownload_site(url):session=get_session()withsession.get(url)作为响应:print(f"Downloaded{len(response.content)}rowsofdata")defdownload_all_sites(sites):withconcurrent.futures.ThreadPoolExecutor(max_workers=8)asexecutor:executor.map(download_site,sites)if__name__=="__main__":sites=["https://v3u.cn"]*50start_time=time.time()download_all_sites(sites)duration=time.time()-start_timeprint(f"下载{len(sites)}次并执行{duration}秒")这里,线程池上下文管理器是通过with启用的关键字,并发8个线程下载,程序返回:76424rowsofdatadownloaded76424rowsofdatadownloaded76424rows下载的数据量76424行下载的数据7642476424行数据已下载76424行数据已下载76424行数据已下载行数据已下载76424行数据已下载76424行数据已下载76424行数据已下载76161行数据已下载76424行数据已下载76161行数据已下载76161行数据已下载。下载了76,161行数据。下载了76,161行数据。下载了76,161行数据。下载了76,161行数据。线程实际上是在不断地运行“切换”,这样就节省了单个线程每次等待爬取结果的时间:这又带来了另一个问题:上下文切换的时间开销我们继续改造,使用协程来试试看。首先安装异步web请求库aiohttp:pip3installaiohttp重写逻辑:importasyncioimporttimeimportaiohttpasyncdefdownload_site(session,url):asyncwithsession.get(url)asresponse:print(f"Downloaded{response.content_length}行数据")asyncdefdownload_all_sites(sites):asyncwithaiohttp.ClientSession()assession:tasks=[]forurlinsites:task=asyncio.ensure_future(download_site(session,url))tasks.append(task))awaitasyncio.gather(*tasks,return_exceptions=True)if__name__=="__main__":sites=["https://v3u.cn"]*50start_time=time.time()asyncio.run(download_all_sites(站点))duration=time.time()-start_timeprint(f"下载{len(sites)}次,执行{duration}秒")程序返回:downloaded76424rowsofdatadownloaded76424rowsofdatadownloaded76424rowsofdatadownloaded76424行数据已下载76424行数据已下载76424行数据已下载76424行数据已下载76424行of数据下载已加载76424行数据已下载76424行数据已下载76424行数据已下载76424行数据已下载76424行数据已下载76424行数据已下载76424行数据已下载76424行数据已下载76424行数据已下载76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76424行数据下载了76161行数据76424行数据已下载76161行数据已下载76161行数据已下载76161行数据已下载76,161行数据已下载76,161行数据已下载76,161行数据已下载76,161rowsofdata下载了76161行数据下载了76161行数据下载了50次,执行了6.893810987472534秒效率更上一层楼,同样是使用with关键字来操作contextmanager,协程使用asyncio.ensure\_future()来创建一个任务列表,它也负责开始他们创建的所有任务后,使用asyncio.gather()保持会话上下文的一个实例,直到所有爬取任务完成。与多线程threading不同的是,协程不需要切换上下文,所以每个任务需要的资源和创建时间要少得多,所以创建和运行更多的任务效率更高:综上所述,并发逻辑归结为减少CPU的等待时间,也就是让CPU等一会儿,而协程的工作方式显然让CPU等待的时间最少。并行模式:多进程multiprocessing让我们再次尝试多进程multiprocessing。并行性可以做并发的事情吗?导入请求导入多处理导入时间会话=无defset_global_session():如果不是会话则为全局会话:session=requests.Session()defdownload_site(url):以session.get(url)作为响应:name=multiprocessing.current_process().nameprint(f"read{len(response.content)}line")defdownload_all_sites(sites):withmultiprocessing.Pool(initializer=set_global_session)aspool:pool.map(download_site,sites)if__name__=="__main__":sites=["https://v3u.cn"]*50start_time=time.time()download_all_sites(sites)duration=time.time()-start_timeprint(f"下载{len(sites)}次,executedfor{duration}seconds")这里我们还是使用contextmanager开启进程池,默认进程数与当前电脑的CPU核数匹配,即有多少核就开启多少进程,程序返回:读取76000行读取76241行读取76044行读取75894行读取76290行读取76312行读取76419行读取76753行读取76290行读取读取行76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76290行读取76阅读76290行阅读76290行阅读76290行阅读76290行阅读76290行阅读76290行阅读76290行阅读行阅读76290行阅读76290行阅读76290行阅读76290行阅读76290行阅读76290行阅读76290行阅读76290行阅读7676290linesread读取76290行,读取76290行,读取76290行,读取76290行,下载50次,执行8.195281982421875秒。虽然它比同步程序快,但无疑不如多线程和协程高效。为什么?因为多进程不适合做IO密集型的任务,虽然可以使用多核资源,但是没有意义:不管开多少个进程,CPU都是没用的。大多数情况下,CPU都在等待IO操作。也就是说,多核是拖IO程序的执行。并行性的选择:CPU密集型任务什么是CPU密集型任务?这里我们可以利用逆定理:凡是不涉及硬盘读写(数据库读写)、网络请求、文件打印等的任务,都认为是CPU密集型任务。说白了,它们是计算任务。以平方和为例:importtimedefcpu_bound(number):returnsum(i*iforiinrange(number))deffind_sums(numbers):fornumberinnumbers:cpu_bound(number)if__name__==“__main__”:numbers=[5_000_000+xforxinrange(20)]start_time=time.time()find_sums(numbers)duration=time.time()-start_timeprint(f"{duration}seconds")执行20时间同步,需要多少时间?4.466595888137817秒然后尝试并行模式:map(cpu_bound,numbers)if__name__=="__main__":numbers=[5_000_000+xforxinrange(20)]start_time=time.time()find_sums(numbers)duration=time.time()-start_timeprint(f"{duration}seconds")八核处理器,开启八个进程开始运行:1.1755797863006592秒不用说,并行方式有效提高了计算效率。最后,由于我们之前并行运行过IO密集型任务,让我们尝试并发运行CPU密集型任务:deffind_sums(numbers):withconcurrent.futures.ThreadPoolExecutor(max_workers=8)作为执行者:executor.map(cpu_bound,numbers)if__name__=="__main__":numbers=[5_000_000+xforxinrange(20)]start_time=time.time()find_sums(numbers)duration=time.time()-start_timeprint(f"{duration}seconds")单进程开8个线程,往上走:4.452666759490967秒怎么样?像并行运行IO密集型任务,可以运行,但是没有任何意义。为什么?因为没有IO操作,CPU不需要等待,CPU只需要全力计算就可以了,所以如果用多线程或者协程,无非是画蛇添足。结语有经验的汽修师傅会告诉你,要省油就选CVT和双离合,要质量稳定就选AT,经常高速激烈驾驶就选双离合,经常开高速就选双离合。市区堵车,选CVT;有经验的后台研发,也可以告诉汽车修理工,任何不需要CPU等待的任务都可以选择并行(multiprocessing)处理方式,而需要CPU长时间等待的任务应该选择并发(线程/异步)方法。反之我想在高速公路上用CVT拖车,在市区堵车用双离合好吗?是的,但是没有意义,或者更准确的说,没有额外的好处;在并发执行CPU密集型任务时,并行执行IO密集型任务是否可以?还好,但还是没有额外的收入,没有别的,物欲无定,适口者为贵。原文转载自《刘越的技术博客》https://v3u.cn/a_id_221
