当前位置: 首页 > 后端技术 > Python

运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真协程异步消费任务调度实践

时间:2023-03-25 22:45:50 Python

我们一直信奉这样一句话:协程是比多线程更高效的并发工作方式,它是完全由程序自身控制,即在用户态执行,协程避免了线程切换那样的上下文切换,性能有了很大的提升。毫无疑问,这是业界牢不可破的共识,也是放之四海而皆准的真理。但实际上,协程远比大多数人想象的要复杂。因为协程的“用户态”特性,任务调度掌握在编写协程任务的人手中,仅仅依靠async和await关键字是遥不可及的。如果达不到“调度”的程度,有时会拖累任务效率,使其在任务执行效率上不如“系统状态”的多线程、多进程。本次我们将讨论Python3原生协程任务管理的调度。Python3.10协程库async.io的基本操作事件循环(Eventloop)是原生协程库asyncio的核心,可以理解为统帅。Eventloop实例提供注册、取消和执行任务和回调的方法。Eventloop可以给事件循环绑定一些异步方法,事件循环会循环执行这些方法,但是和多线程一样,同一时间只能执行一个方法,因为协程也是单线程的。当一个方法执行时,如果遇到阻塞,事件循环会暂停它的执行去执行其他方法,同时为这个方法注册一个回调事件。当一个方法从阻塞中恢复时,下一轮被轮询时会继续执行,或者,当没有被轮询时,它会提前从阻塞中恢复,也可以通过回调事件来切换,等等,这就是事件循环的简单逻辑。而上面的核心动作就是切换其他方法,怎么切换呢?使用await关键字:importasyncioasyncdefjob1():print('job1starts')awaitasyncio.sleep(1)print('job1ends')asyncdefjob2():print('job2starts')asyncdefmain():awaitjob1()awaitjob2()if__name__=='__main__':asyncio.run(main())系统返回:job1startsjob1endsjob2starts是的,如果切了就切了吧?其实这两个协程任务并没有实现“协同”,因为它们是同步执行的,所以在方法中不等待,可以实现协程的工作方式。我们需要同时启动这两个协程任务。:importasyncioasyncdefjob1():print('job1starts')awaitasyncio.sleep(1)print('job1ends')asyncdefjob2():print('job2starts')asyncdefmain():#awaitjob1()#awaitjob2()awaitasyncio.gather(job1(),job2())if__name__=='__main__':asyncio.run(main())系统返回:job1startsjob2startsjob1endsifthere没有asyncio.gather的参与,协程方法就是一个普通的同步方法,即使用async声明async也于事无补。asyncio.gather的基本功能是并发执行协程任务,从而实现“协作”。但其实Python3.10也支持“同步写入”的协程方法:asyncdefcreate_task():task1=asyncio.create_task(job1())task2=asyncio.create_task(job2())awaittask1awaittask2这里我们通过asyncio.create\_task封装job1和job2,然后通过await调用返回的对象,这样两个独立的异步方法绑定到同一个Eventloop,所以虽然写的是同步的,但实际上是异步执行的:importasyncioasyncdefjob1():print('job1starts')awaitasyncio.sleep(1)print('job1ends')asyncdefjob2():print('job2starts')asyncdefcreate_task():task1=asyncio.create_task(job1())task2=asyncio.create_task(job2())awaittask1awaittask2asyncdefmain():#awaitjob1()#awaitjob2()awaitasyncio.gather(job1(),job2())如果__name__=='__main__':asyncio.run(create_task())系统返回:job1startsjob2startsjob1ends协程任务上下游监控解决问题m并发执行。现在假设每个异步任务都会返回一个操作结果:asyncdefjob1():print('job1starts')awaitasyncio.sleep(1)print('job1ends')return"job1taskresult"asyncdefjob2():print('job2started')return"job2taskresult"通过asyncio.gather方法,我们可以收集任务执行结果:asyncdefmain():res=awaitasyncio.gather(job1(),job2())print(res)同时执行任务:importasyncioasyncdefjob1():print('job1starts')awaitasyncio.sleep(1)print('job1ends')return"job1taskresult"asyncdefjob2():print('job2started')return"job2taskresult"asyncdefmain():res=awaitasyncio.gather(job1(),job2())print(res)if__name__=='__main__':asyncio.run(main())系统返回:job1startsjob2startsjob1ends['job1','job2']但任务结果只是方法的返回值,除此之外没有其他有价值的信息,执行细节协程任务保密现在我们换成asyncio.wait方法:asyncdefmain():res=awaitasyncio.wait([job1(),job2()])print(res)仍然并发执行:importasyncioasyncdefjob1():print('job1start')awaitasyncio.sleep(1)print('job1end')return"job1taskresult"asyncdefjob2():print('job2start')return"job2taskresult"asyncdefmain():res=awaitasyncio.wait([job1(),job2()])print(res)if__name__=='__main__':asyncio.run(main())系统返回:job1开始job2开始job1结束({result='job1taskresult'>,result='job2taskresult'>},set())可以看到asyncio.wait返回任务对象,它存储了大部分任务信息,包括执行状态。默认情况下,asyncio.wait会等待所有任务完成(return\_when='ALL\_COMPLETED'),同时也支持return\_when='FIRST\_COMPLETED'(第一个协程完成时返回)和return\_when='FIRST\_EXCEPTION'(返回第一个异常)。这是非常令人兴奋的,因为如果异步消费任务是发送短信等需要统计到达率的任务,利用asyncio.wait特性,我们可以在第一时间记录任务完成或异常的具体时间地方。协程任务守卫假定由于某种原因,我们手动终止任务消费:结果“asyncdefjob2():print('job2started')return“job2taskresult”asyncdefmain():task1=asyncio.create_task(job1())task2=asyncio.create_task(job2())task1.cancel()res=awaitasyncio.gather(task1,task2)print(res)if__name__=='__main__':asyncio.run(main())系统错误:文件“/Users/liuyue/Downloads/upload/test/test_async.py",line23,inmainres=awaitasyncio.gather(task1,task2)asyncio.exceptions.CancelledError这里手动取消了job1,但是会影响job2的执行,违反了“相互支持”的特性协程。其实asyncio.gather方法可以捕获协程任务的异常:importasyncioasyncdefjob1():print('job1starts')awaitasyncio.sleep(1)print('job1ends')return"job1task结果"asyncdefjob2():print('job2started')return"job2taskresult"asyncdefmain():task1=asyncio.create_task(job1())task2=asyncio.create_task(job2())task1.cancel()res=awaitasyncio.gather(task1,task2,return_exceptions=True)print(res)if__name__=='__main__':asyncio.run(main())系统返回:job2started[CancelledError(''),'job2taskResult']可以看到job1没有执行,异常替换了taskresult作为返回值。但是如果启动了协程任务,需要保证任务不会被取消。这时候可以使用asyncio.shield方法守护协程任务:importasyncioasyncdefjob1():print('job1starts')awaitasyncio。sleep(1)print('job1end')return"job1taskresult"asyncdefjob2():print('job2start')return"job2taskresult"asyncdefmain():task1=asyncio.shield(job1())task2=asyncio.create_task(job2())res=awaitasyncio.gather(task1,task2,return_exceptions=True)task1.cancel()print(res)if__name__=='__main__':asyncio.run(main())系统返回:job1startsjob2startsjob1ends['job1taskresult','job2taskresult']协程任务回调假设协程任务执行完成后,需要立即执行回调操作,比如推送任务结果到其他接口服务上:importasyncioasyncdefjob1():print('job1starts')awaitasyncio.sleep(1)print('job1ends')return"job1taskresult"asyncdefjob2():print('job2starts')return"job2taskresult"defcallback(future):print(f'callbacktask:{future.result()}')asyncdefmain():任务1=异步。屏蔽(作业1())任务2=异步。创建_任务(job2())任务1。add_done_callback(回调)res=等待异步。gather(task1,task2,return_exceptions=True)print(res)if__name__=='__main__':asyncio.run(main())这里我们通过add\_done\_callback方法指定job1的回调方法。任务执行时会调用回调,系统返回:job1startsjob2startsjob1ends回调任务:job1taskresult['job1taskresult','job2taskresult']同时,add\_done\_callback方法不仅可以获取协程任务的返回值,还支持参数传参:importasynciofromfunctoolsimportpartialasyncdefjob1():print('job1started')awaitasyncio.sleep(1)print('job1ended')return"job1taskresult"asyncdefjob2():print('job2started')return"job2taskresult"defcallback(future,num):print(f"回调参数{num}")print(f'callbacktask:{future.result()}')asyncdefmain():task1=asyncio.shield(job1())task2=asyncio.create_task(job2())task1.add_done_callback(partial(callback,num=1))res=awaitasyncio.gather(task1,task2,return_exceptions=True)print(res)if__name__=='__main__':asyncio.run(main())系统返回:job1启动job2启动job1结束回调参数1Callbacktask:job1taskresult['job1taskresult','job2taskresult']结论成功是用户态,失败也是用户态。它比多线程系统级调度更复杂。一不小心,就会造成业务中的“同步”阻塞,弄巧成拙,适得其反。这也解释了为什么类似场景下多线程的出镜率远高于协程,因为多线程不需要考虑启动后的“切换”问题,什么都不做简单粗暴。