当前位置: 首页 > 科技观察

网络人的Python之路:Concurrent.Futures

时间:2023-03-18 10:53:18 科技观察

我在去年写的两篇专栏中介绍了多线程(threading)和异步IO(asyncio),并给大家举了一个例子来说明网络人如何使用它们应用在我们平时的网络运维中,提高Python脚本的工作效率。本文介绍另一个可以实现并发编程的Python标准库:concurrent.futures。基本概念网工一定都或多或少听说过同步(Synchronous)、异步(Asynchronous)、单线程(SingleThreaded)、多线程(MultiThreaded)、多处理(Multiprocessing)、多任务(Multitasking)、并发(Concurrent)、并行并行(Parallesim)、协程(Coroutine)、I/O密集型(I/O-bound)、CPU密集型(CPU-bound)等名词,如何区分它们对于学习Python来说是网络工作者的难点。在开始讲concurrent.futures之前,我先给大家大致介绍一下以上几个名词的关系和区别:1.同步(Synchronous)VS异步(Asynchronous)所谓同步可以理解为每当After系统执行一段代码或函数,系统会等待代码或函数返回的值或消息,直到系统接收到返回值或消息后,才继续执行下一段代码或函数。在消息期间,程序被阻塞,系统不会做任何事情。异步正好相反。执行完一段代码或函数后,系统不会阻塞地等待返回值或消息,而是继续执行下一段代码或函数,并在同一时间段内执行多个任务(而不是傻傻地等待做完一件事,等结果出来再做下一件事),并发多个任务(注意不是并行),从而提高程序的执行效率。如果你读过数学家华罗庚的《统筹方法》,你一定不会陌生里面提到的例子:也是泡茶的步骤,因为水烧开需要一段时间,你不必等水烧开再洗茶杯,倒茶叶(类似“同步”),但要等水烧开的时候洗茶杯,倒茶叶,水烧开后直接泡茶。茶的三个任务在同一时间段内同时完成,是典型的“异步”。对于我们网络工作者来说,paramiko、netmiko、telnetlib、pexpect、ciscolib等第三方模块默认都是基于同步的,异步模块有asyncio、asyncping、netdev等(pexpect也支持异步,但是必须手动已调整,默认为同步)。2、线程(Thread)VS进程(Process)所谓线程是指操作系统能够进行操作调度的最小单位。线程依赖于进程,是进程中的实际运行单元。一个进程可以有多个线程,每个线程可以并发执行不同的任务。3.单线程(SingleThreaded)VS多线程(MultiThreaded)我们也可以引用同一个例子来说明单线程和多线程的区别。上述华罗庚泡茶的例子《统筹方法》,如果只有一个人完成烧水、洗茶杯、倒茶叶这三个任务,因为此时只有一个劳动力time,我们可以认为它是单线程的(同步和异步IO都是基于单线程的)。假设我们可以找三个人分别负责烧水、洗茶杯、倒茶,那么我们可以把它看成是多线程的,每个劳动力代表一个线程,但是由于多线程的GlobalInterpreterLock机制(俗称GIL(全局锁的存在),其实这三种劳动并不是同时进行的。从并发性能和效率的角度来看,多线程其实是弱于单线程异步IO的,这一点我们之前已经在两篇文章中进行了实验验证。说到单线程和多线程,还要说一下异步IO和多线程的区别:异步IO是单线程,而多线程,顾名思义就是多线程。异步IO和多线程的区别在于它们的机制不同。多线程使用抢占式多任务处理(Pre-emptiveMultitasking)。在这种抢占式环境下,操作系统本身就具有控制所有任务(即程序)的能力,可以随意剥夺每个任务的时间片来提供给其他任务,也就是背后有一个大老板控制一切。异步IO的机制是协作多任务(CooperativeMultitasking)。这个机制没有幕后大佬。在协作环境下,每个任务被调度的前提是当前任务主动让出时间片。异步IO的核心是Coroutine,这是多线程所没有的。协程是一个轻量级的线程,是一种特殊的生成器函数,可以在return语句执行之前停止该函数当前正在执行的任务,并可以间接将一段时间的执行权交给它。另一个协程函数。协程强调协作,而不是多线程强调的抢占。asyncio是Python中唯一支持协程的标准库。4.并发(Concurrent)VS并行(Parallesim)并发是一个笼统的概念。在Python中,逻辑上同时发生的任务有很多名称:多线程、异步IO(多任务处理)和多处理。一种并发。深入一点,只有调用多核CPU的多处理(Multiprocessing)来处理物理上同时发生的任务。这称为并行性。基于单核CPU的多线程和异步IO(多任务处理)一次只能处理一个事件(但它们有自己独特的机制来加速处理不同事件的能力),称为并发。借用知乎网友举的一个例子来说明同步、并发、并行的区别。正在吃饭的时候,突然有人叫你。如果此时你:不接电话,继续吃饭,吃完饭再打回来,这叫同步。接完电话后放下筷子停止进食,等电话结束后再继续进食。这称为并发。边接电话边继续吃饭,这叫并行。综上所述,并行是一种并发,但并发不等于并行。5.I/O-intensive(I/Obound)VSCPU-intensive(CPUbound)I/O-intensive(I/Obound)是指不会特别消耗CPU资源,但I/O比较频繁的任务和操作,如文件读写、网络通信、数据库访问等。CPU密集型(CPUbound)是指需要大量CPU资源的任务和操作,如计算、解压、加密解密等在。异步和多线程适用于I/O密集型场景,多进程适用于CPU密集型场景。以上内容可归纳为下表:并发类型切换机制CPU数量适用场景代表Python库多线程(抢占式多任务)操作系统决定何时切换任务1I/O密集型_thread(已废弃),threading,cocurrent.futures,nornir异步(cooperativemultitasking)任务自己决定什么时候切换1I/Ointensiveasyncio,netdev,aiohttp,aioping,gevent,tornado,twistedmultiprocess(parallel)所有任务同时运行MultipleCPU-intensivemultiprocessing就可以了,说完说了这么多,让我们输入这段文字:concurrent.futures。什么是Concurrent.futuresConcurrent.futures是Python中的标准库。顾名思义,它是一种并发编程。按照Python官方的定义,concurrent.futures是一个结合了多线程和多处理特性的高层接口,并简化两者。Concurrent.futures是从Python3.2引入的。它诞生晚于threading和multiprocessing这两个标准库,但早于Python3.4诞生的asyncio标准库。Future对象在concurrent.futures中引入了future对象。至今听说过future的中文翻译,如future、future等,但是没有统一的说法(Python官方中文文档没有解释),所以这里我们还是用future来说话。主线程(或进程)可以通过future对象获取某个线程(进程)的执行状态或某个任务的执行状态和返回值。执行器对象Concurrent.futures中还有一个重要的对象叫做执行器(Executor),分为两种:ThreadPoolExecutor和ProcessPoolExecutor。基本上可以把它们看成是multiprocessing库中的线程池和进程池(支持多进程的multiprocessing标准库之前没有讲过,下一篇再讲),前面说了,并发的优势.futures与multiprocessing和threading库相比,语法更简单,学习成本更低。先说理论,再做实验说明如何使用concurrent.futures。为了对比,我将使用单线程同步、threading、concurrent.futures来举三个例子。先看最原始的单线程同步:1、单线程同步实验:importtimedefdo_something():print('休眠1秒')time.sleep(1)start_time=time.perf_counter()do_something()do_something()end_time=time.perf_counter()-start_timeprint(f'takes{round(end_time,2)}secondsintotal')这里我们自定义了一个函数叫do_something(),它的任务很简单,就是把内容打印出来“休眠1秒”,然后使用time.sleep(1)让程序休眠1秒。然后我们调用两次do_something()函数,打印出耗时,因为是单线程同步,所以执行两次do_something()的总耗时为2.01秒。2.线程实验importthreadingimporttimedefdo_something():time.sleep(1)start_time=time.perf_counter()threads=[]foriinrange(1,11):t=threading.Thread(target=do_something,name=f'thread{str(i)}')print(f'{t.name}开始运行')print('休眠1秒')t.start()threads.append(t)forthreadinthreads:thread.join()end_time=time.perf_counter()-start_timeprint(f'totallytakes{round(end_time,2)}seconds')这里我们使用线程来执行do_something()总共10次。如果使用单线程同步方式,一共需要10秒+才能完成,而通过threading模块,我们使用多线程让这10个do_something()并发执行,所以只需要1.05秒宣布完成。三、Concurrent.futures实验(分三种代码)因为涉及到不同的知识点,所以我将Concurrent.futures实验的代码分三种来写。首先我们看第一段代码:fromconcurrent.futuresimportThreadPoolExecutorimporttimedefdo_something(seconds):print(f'sleep{seconds}seconds')time.sleep(seconds)return'sleepcomplete'start_time=time.perf_counter()executor=ThreadPoolExecutor()f1=executor.submit(do_something,1)f2=executor.submit(do_something,1)print(f1.result())print(f2.result())print(f'task1completed:{f1.done()}')print(f'task2completed:{f1.done()}')end_time=time.perf_counter()-start_timeprint(f'totallytakes{round(end_time,2)}seconds')代码解释(仅知concurrent.futures相关的要点):这里我们使用fromconcurrent.futuresimportThreadPoolExecutor来调用concurrent.futures的线程池处理器对象fromconcurrent.futuresimportThreadPoolExecutor这里注意我们在do_something()函数后面添加了参数seconds,并且添加了一个最后返回“睡眠完成”。它们的函数会等待我稍后再说:defdo_something(seconds):print(f'sleep{seconds}seconds')time.sleep(seconds)return'sleepcompleted'在concurent.futures中,ThreadPoolExecutor是两个子类之一的Executor(另外一个是ProcessPoolExecutor),它使用线程池进行异步调用,这里我们将ThreadPoolExecutor()赋值给一个调用的函数执行器变量executor=ThreadPoolExecutor()然后我们使用ThreadPoolExecutor下的submit()函数来创建一个线程。submit()函数中包含要调用的任务,即do_something(),以及函数要调用的参数(即秒内的dosmeting()),这里我们放1,表示休眠一秒,所以写成submit(do_something,1),因为submit()函数返回的值是future类型的对象,所以这里我们将future简写为f,分别给两个变量f1和f2赋值,表示do_something()函数被并发执行了两次。f1=executor.submit(do_something,1)f2=executor.submit(do_something,1)前面提到future对象的作用是帮助主线程(或进程)获取某个线程(进程)或者某个任务的执行状态和返回值,为了给大家演示,这里我分别在f1和f2这两个future对象上调用result()和done()这两个函数,并打印出它们的结果。print(f1.result())print(f2.result())print(f'task1completed:{f1.done()}')print(f'task2completed:{f1.done()}')在future,result()的作用就是告诉你任务进行到哪一步了,有没有异常。如果任务正常完成,没有异常,那么result()会返回自定义函数下return的内容(也就是我们的do_someting()底层return'sleepcomplete'),如果任务执行过程中遇到异常,那么result()会返回异常的具体内容。done()返回一个布尔值,告诉你任务是否完成,如果完成则返回True,否则返回False。接下来看脚本的运行效果:可以看到同步耗时2秒+完成的两个任务被concurrent.futures缩短为1.02秒(这个时间不确定,多次运行脚本会看1.01秒、1.02秒、1.03秒、1.04秒等,这跟当前电脑的性能有关)。注意这里的两个“sleepcompleted”是通过print(f1.result())和print(f2.result())打印出来的,打印的是“task1是否完成:True”和“task2是否完成:True”(f'task1是否完成:{f1.done()}')和print(f'task2iscompleted:{f1.done()}')打印出来。接下来看concurrent.futures的第二个实验代码:fromconcurrent.futuresimportThreadPoolExecutor,as_completedimporttimedefdo_something(seconds):print(f'sleep{seconds}seconds')time.sleep(seconds)return'sleepcompleted'start_time=time。perf_counter()executor=ThreadPoolExecutor()results=[executor.submit(do_something,1)foriinrange(10)]forfinas_completed(results):print(f.result())end_time=time.perf_counter()-start_timeprint(f'total耗时{round(end_time,2)}秒')代码解释(仅涉及concurrent.futures相关的知识点):这里我们从concurrent.futures中引入了一个新的函数as_completed,其作用后面会提到。fromconcurrent.futuresimportThreadPoolExecutor,as_completed第一段代码不灵活,因为我们手动创建了f1和f2线程,如果我们要并发运行do_something()任务100次,我们手动创建f1,f2,f3显然是不可能的...f100这100个变量。这里我们可以通过listcomprehension的形式创建一个list,让do_something()函数并发运行10次。results=[executor.submit(do_something,1)foriinrange(10)]在concurrent.futures中,as_completed(fs)函数是针对给定的future迭代器fs,完成后返回完成的迭代器(type还是future).这里的fs就是我们创建的列表结果。因为concurrent.futures.as_completed(results)返回的值是一个迭代器,我们可以用for循环遍历它,然后在元素(都是future类型)上调用上面提到的result()函数,打印forfinas_completed(results):print(f.result())执行代码看效果,可以看到10个do_something()任务在1.06秒内完成。concurrent.futures的第三个实验代码:fromconcurrent.futuresimportThreadPoolExecutorimporttimedefdo_something(seconds):print(f'sleep{seconds}seconds')time.sleep(seconds)return'sleepcompleted'start_time=time.perf_counter()executor=ThreadPoolExecutor()sec=[5,4,3,2,1]results=executor.map(do_something,sec)forresultinresults:print(result)end_time=time.perf_counter()-start_timeprint(f'总耗时{round(end_time,2)}seconds')代码解释(仅concurrent.futures相关的知识点):除了通过listcomprehension指定N次并发运行do_something(seconds)外,我们还可以通过concurrent.futures.ThreadPoolExecutor()下面的map()函数用于实现目标。map()函数类似于submit()函数。两者都可用于创建线程,然后并发执行任务并返回未来的对象,但它比submit()函数更灵活。它们的区别在于:map()函数传入的第二个参数是一个可遍历对象,这个可遍历对象中的元素可以作为函数的参数。例如这里我们定义了列表sec=[5,4,3,2,1],作为map()函数的第二个参数传入(executor.map(do_something,sec)),因为list一共有5个元素,所以我们创建并并发创建了5个线程来执行do_something(seconds)5次。第一次将列表中的元素5作为参数传递给do_something(seconds),也就是第一个线程执行完后会休眠5秒,第二次将列表中的元素4作为参数传递给do_something(seconds),即第二个线程执行完会休眠4秒,以此类推。executor=ThreadPoolExecutor()sec=[5,4,3,2,1]results=executor.map(do_something,sec)接下来看脚本的运行效果:因为并发执行了5个任务,所以程序消耗了5秒,4秒,3秒,2秒??,最大值1秒,总共耗时5.03秒完成。