概述本文通过运行一个Python小程序来模拟一个真实的任务。比较多线程和协程环境中的编程实现。发现并解释一些有趣的现象。为了给大家带来一些对协程的直观感受,加深大家对协程这个新生事物的理解。Python中的协程是一个历史悠久的概念。它是计算机程序的一类组件,用于推广用于协作多任务处理的子程序。其详细的概念和历史可以参考维基百科中的词条:https://en.wikipedia.org/wiki...Python天然支持的生成器(generator)其实就是coroutines的一种实现。生成器允许暂停和恢复执行。但是,由于缺乏更多的语法支持,缺乏使用生成器实现异步编程的成熟模型,使用生成器作为协程参与协作多任务编程受到了限制。然而,现在情况发生了变化。Python从3.6版本开始加入了async/await语法,直接支持协程的异步编程。同时在asyncio库中提供了协程编程的接口和必要的基础实现。社区也在不断努力,提供现有IO库的异步版本,用于协程开发环境。比如httpclient目前提供了一个异步版本,至少可以用于aiohttp和tornado中的协程。我们知道IO操作本质上是异步的。为了适应广泛使用的传统同步编程模型,许多IO库使用阻塞调用者来实现同步。这样虽然简化了编程,但是也带来了并行度低的问题。在一些有大量耗时IO操作的环境中,高层应用不得不忍受串行操作带来的长时间等待,或者转向多进程(Multi-Process)多线程编程来提高并行度.而多进程、多线程编程会引入竞争、通信、同步、保护等棘手的问题。而我们知道,即使是一个轻量级的线程,也会对应一个独立的运行栈。线程的调度和切换必然包括运行栈的切换和加载。如果一个进程中有成百上千个线程,相应的调度开销会急剧上升到难以承受的程度。而线程之间的同步和互锁也会成为噩梦。除了boss级别的死锁问题,其他任何bug或缺陷在多线程环境下都很难重现和跟踪,因为线程的调度是非常随机的。Python小程序下面是一个在Python3.8或更高版本上运行的Python小程序。importthreadingimporttimeimportasynciodefgen():s=0whiles<1000:yieldss+=1defunsafe_thread_worker(g):t=0try:whileTrue:v=next(g)time.sleep(0.01)t+=v除了StopIteration:print(f"{t}")asyncdefwrong_coroutine_worker(g):t=0try:whileTrue:v=next(g)time.sleep(0.01)t+=v除了StopIteration:print(f"{t}")asyncdefstarter_with_wrong_workers():tasks=[]for_inrange(10):task=asyncio.create_task(wrong_coroutine_worker(g))tasks.append(task)awaitasyncio.gather(*tasks)asyncdefright_coroutine_worker(g):t=0try:whileTrue:v=next(g)awaitasyncio.sleep(0.01)t+=v除了StopIteration:print(f"{t}")asyncdefstarter_with_right_workers():tasks=[]为了_在范围(10)内:task=asyncio.create_task(right_coroutine_worker(g))tasks.append(task)awaitasyncio.gather(*tasks)if__name__=='__main__':print('------------------序列--------------')g=gen()started_at=time.monotonic()t=0forving:time.sleep(0.01)t+=vprint(t)total_time=time.monotonic()-started_atprint(f'总消耗时间:{total_time:.2f}秒')print('----------------不安全的线程----------------')g=gen()started_at=time.monotonic()threads=[]for_inrange(10):w=threading.Thread(target=unsafe_thread_worker,args=[g])w.start()threads.append(w)forwinthreads:w.join()total_time=time.monotonic()-started_atprint(f'totaltimeconsumed:{total_time:.2f}seconds')print('----------------Asyncwithwrongcoroutine--------------')g=gen()started_at=time.monotonic()loop=asyncio.get_event_loop()loop.run_until_complete(starter_with_wrong_workers())total_time=time.monotonic()-started_atprint(f'消耗的总时间:{total_time:.2f}秒')print('----------------与右协程异步--------------')g=gen()started_at=time.monotonic()loop=asyncio.get_event_loop()loop.run_until_complete(starter_with_right_workers())total_time=time.monotonic()-started_atprint(f'totaltimeconsumed:{total_time:.2f}seconds')一个典型的运行输出看起来像是这个样子的:----------------Sequence----------------499500totaltimeconsumed:10.53seconds---------------不安全线程----------------49804496095003349682495745000550143500695021950362总消耗时间:1.09秒-----------------与错误的协程异步--------------499500000000000消耗的总时间:10.55seconds----------------异步机智h右协程----------------49500499674997350100499655000049968499634996450100总耗时:1.16秒这个小程序其实模拟了一个常见的真实任务假设我们得到了一个相对通过http数据API逐页处理大型数据集。每页数据通过带有页码或起始位置的URL标识,然后向API服务器发送http请求,并解析返回的http响应中包含的数据。http访问显然是一个耗时的IO操作。返回数据的解析和处理是一个计算密集型操作,其消耗的时间与IO等待相比微不足道。生成器gen可以看作是一个数据页URL生成器,即任务生成器。然后我们用sleep模拟一个耗时的IO操作,用addition模拟数据合并分析。你也可以把这个小程序想象成一个网络爬虫。我们将所有目标网站的地址保存在一个全局列表中,然后串行或并行访问所有目标,检索我们感兴趣的数据并将它们存储在一起进行分析。总共有1000个相对独立的小任务。由于任务之间没有依赖关系,多个任务可以并行执行。每个任务分为接收和指定任务、获取数据(这是一个耗时0.01秒的IO操作)、返回数据存储和处理。在一个任务中,每一步都有很强的依赖性,不能并行执行。下面我们来看一下main函数,其代码分为4段,分别对应4种不同的实现方式。方法一是最传统的串行方法。通过一个简单的循环,一个一个的获取并完成任务,一个任务完成后接收下一个任务。不出所料,由于IO操作是主要的耗时操作,因此串行执行时间等于每个任务所花费时间的总和,0.01*1000=10秒。方法二采用多线程,模拟一个有10个线程的线程池,池中每个线程和方法一一样独立工作,由于所有线程都是并行运行,所以总耗时几乎是串行方法的1/10。方法三使用协程,也是模拟一个协程池,有10个协程。但是由于使用了错误的IO操作,实际上并不能并行执行多个协程,总的耗时和方法1差不多,后面我们会详细分析对比。方法四修正了方法三的错误,使得协程可以并行运行,其总耗时与方法二相当。现在看输出结果。方法一输出499500,即0到999的一千个数之和(sum(range(1000))==499500)。方法2的输出比较乱,10个数并没有分别输出到10行。这是因为标准输出是线程间共享的资源,而print()方法不是线程安全的。从输出来看,当打印出一个数字,再打印一个换行符时,线程很可能会切换,导致输出混乱。实际上,分发任务的生成器gen并不是线程安全的。如果它的调用比较耗时,并且线程之间发生竞争,则会抛出ValueError:generatoralreadyexecution。只是在这个例子中,gen调用速度很快,不太可能遇到争用。如果我们在yield之前加上一行打印语句,那么无论打印什么,都会立即看到上面的异常,读者可以自行测试。方法三的输出很明显只有第一个协程在做工作,它完成了所有的任务,完全串行运行。当其他协程最终能够运行时,就没有剩余任务了。这是因为IO语句time.sleep()是一个同步IO操作,不会导致当前协程挂起让其他协程获得运行的机会,相反,同步IO直接阻塞(Block)当前线程,使得当前线程中的所有协程都无法获得运行的机会。方法4只是将同步IO换成异步asyncio.sleep()(也是用协程实现的),解决了协程并行的问题。让我们关注代码中与协程相关的部分,以更深入地了解协程本身。首先,代码中asyncdef定义的所有函数都是协程函数,是一个语法糖,其本质是生成器函数。和生成器函数一样,直接调用函数不会导致其代码运行,而是生成一个协程实例对象(即生成器实例)。要运行代码,您必须使用与生成器相同的方法,调用实例的send(msg)或next()(next等同于send(None))方法。我们知道generator通过yield退出并挂起,等待下一次send或者nextcall激活,继续运行。asyncdef定义的协程中的return或await可以理解为yield。return返回的值将作为本次send或下一次调用的返回值,生成一条消息放入消息循环队列(事件队列)中。当消息被消息循环(eventloop)处理后,会激活另一个等待(await)消息的协程。之后,返回协程会将控制权交还给消息循环,不再有关于返回协程的消息添加到消息循环队列中,因此协程实例将不再有运行的机会,将被系统回收在适当的时候。await表示async-wait,后面必须调用协程方法,即生成一个协程实例(见前面的描述)。这实际上是在消息队列中放入一条等待协程的启动消息,然后通过yield将控制权交还给消息循环,并声明将使用等待协程的结束消息来重新激活协程的等待。还有一点值得注意:消息队列和消息循环是协程调度的核心。它们属于一个线程。一个线程不能有多个消息循环。协程的进入和退出是由消息循环驱动的,而传统的方法调用是在运行栈的顶部添加一个调用帧。由于堆栈的性质,传统的方法调用只能同步运行。协程适用于异步场景。协程的控制权不会被剥夺,必须由当前运行的协程交还给消息循环。方法是通过等待其他协程或者运行后返回。建议在协程中,如果需要做耗时计算,最好在进程中放一些awaitasyncio.sleep(0)。这会立即将控制权返回给消息循环,sleep会将重新激活的消息放在消息队列的末尾。从而给其他协程一个运行的机会,然后返回激活这个协程继续运行。比如方法三,如果time.sleep()被认为是一个计算密集型的操作,在其前后添加awaitasyncio.sleep(0)会导致其他协程加入工作,虽然在本例中不会保存运行时。一旦协程运行了一个阻塞的操作,整个线程就会被阻塞,线程中的所有其他协程都没有机会运行。因为此时消息循环无法获得运行的机会。这表明使用以协程方式工作的异步IO库的重要性。由于一个线程中的多个协程是轮流调度的,不会有多个协程同时运行,因此协程之间的通信和共享不会造成争用,协程相关的代码不需要考虑同步或保护。这样,在很大程度上可以获得类似多线程、多进程并行的性能提升,同时也可以避免很多并行带来的问题。协程的应用,本质上是为了避免空闲的CPU在等待IO时造成的资源和时间的浪费。如果没有耗时的IO操作,CPU忙于计算,那么使用协程是不会带来性能提升的。相反,由于消息循环的开销,性能会略有下降。协程没有专门的运行栈,其调度切换比线程轻很多,因此可以大规模部署和运行协程。但这并不意味着没有限制。我们知道协程其实就是一个对象实例,大规模无限制的部署可能会耗尽内存等资源。像本例中的协程池这样的概念可以解决这个问题。或者可以使用asyncio中提供的同步原语(SynchronizationPrimitives)来限制协程生成和激活的次数。
