当前位置: 首页 > 后端技术 > Python

Python学习笔记-第十八章使用asyncio包处理并发

时间:2023-03-25 22:21:42 Python

第十八章使用asyncio包处理并发第十八章使用asyncio包处理并发主线程和子线程守护线程关键组件callbackasyncio和gevent关系说明asyncio和Flaskaiohttpvs.单进程和多进程主线程和子线程当一个进程启动时,默认会生成一个主线程,因为线程是程序执行流程的最小单位。当设置了多线程时,主线程会创建多个子线程。在python中,默认情况下(实际上是setDaemon(False)),主线程执行完自己的任务后就会退出。这时子线程会继续执行自己的任务,直到自己的任务结束。示例:importtimeimportthreadingdefrun():time.sleep(2)print('当前线程名称是:%s'%threading.currentThread().name)time.sleep(2)if__name__=="__main__":start_time=time.time()print('这是主线程:%s'%threading.current_thread().name)thread_list=[]foriinrange(5):t=threading.Thread(target=run)线程列表。append(t)fortinthread_list:t.start()print('主线程结束:%s'%threading.current_thread().name)print('总共享时间:%f'%float(time.time()-start_time))运行结果:Thisisthemainthread:MainThread结束主线程:MainThread总时间:0.002953当前线程名称为:Thread-2当前线程名称为:Thread-1当前线程名称为:Thread-5当前线程名称为:Thread-3当前线程名称为:Thread-4守护线程当我们使用setDaemon(True)方法将子线程设置为守护线程时,一旦主线程执行结束,所有线程都将终止。即子线程的任务还没有执行完就被强制停止。示例:importtimeimportthreadingdefrun():time.sleep(2)print('当前线程名称是:%s'%threading.currentThread().name)time.sleep(2)if__name__=="__main__":start_time=time.time()print('这是主线程:%s'%threading.current_thread().name)thread_list=[]foriinrange(5):t=threading.Thread(target=run)线程列表。append(t)fortinthread_list:t.setDaemon(True)t.start()print('主线程结束:%s'%threading.current_thread().name)print('总运行时间:%f'%float(time.time()-start_time))运行结果:Thisisthemainthread:MainThread主线程结束:MainThreadsharedtime:0.002954threadsynchronizationjoin所做的工作是线程同步,即main之后thread任务结束,进入阻塞状态,等待其他子线程执行完毕,主线程终止例子:importtimeimportthreadingdefrun():time.sleep(2)print('当前线程的名字是:%s'%threading.currentThread().name)time.sleep(2)if__name__=="__main__":start_time=time.time()print('这是主线程:%s'%threading.current_thread().name)thread_list=[]foriinrange(5):t=threading.Thread(target=run)thread_list.append(t)fortinthread_list:t.start()t.join()print('主线程结束:%s'%threading.current_thread().name)print('总运行时间:%f'%float(time.time()-start_time))运行结果:这是主线程:MainThread当前线程名称为:Thread-1当前线程名称为:Thread-2当前线程名称为:Thread-3当前线程名称为:Thread-4当前线程名称为:Thread-5主线程结束:MainThread共享时间:20.015406asyncio简介asyncio的编程模型是一个消息循环,异步的关键组件而non-blockingcoroutines表明asyncio使用了和之前python完全不同的用法结构:eventloop,coroutineandfuturesasyncio的一些关键字的解释:event_loop事件循环:程序开启一个无限循环,注册一些函数到事件循环,当事件发生时调用对应的协程FunctioncoroutineCoroutine:协程对象,指用async关键字定义的函数,其调用不会立即执行该函数,而是返回一个协程对象。协程对象需要注册到事件循环中,并被事件循环调用。task任务:协程对象是一个可以被挂起的native函数,task是对协程的进一步封装,包含了task的各种状态。future:表示将来执行或不执行的任务的结果。它和任务没有本质区别。async/await关键字:python3.5用于定义协程关键字,async定义协程,await用于挂起阻塞的异步调用接口。示例:importasyncioimportdatetime#Writing1#@asyncio.coroutine#defhello():#print('helloworld')#r=yieldfromasyncio.sleep(1)#print('helloagain')#Writing2asyncdef你好():打印('你好世界')等待异步。睡眠(1)打印('再次问候')如果__name__==“__main__”:loop=asyncio。get_event_loop()任务=循环。create_task(hello())print(datetime.datetime.now())#print(task)loop.run_until_complete(task)#print(task)print(datetime.datetime.now())loop.close()协程对象不能直接运行,在注册后的事件循环中,run_until_complete方法实际上是将协程包装成一个任务对象。所谓任务对象就是Future类的子类。保存协程运行后的状态,用于以后获取协程的结果。当通过loop.create_task(hello())时,任务实际上是pending状态。在hello中,通过asyncio.sleep(1)需要一秒钟。任务执行后状态变为done。asyncio.ensure_future(coroutine)和loop.create_task(coroutine)都可以创建任务,run_until_complete的参数是一个future对象。当传入协程时,会自动封装成任务。Task是Future的子类我们来看一个例子:importasyncioimportrandomasyncdefMyCoroutine(id):process_time=random.randint(1,5)#使用asyncio.sleep模拟一些耗时操作awaitasyncio.sleep(process_time)print("Coroutine:{},finishedexecution.Timespent:{}seconds".format(id,process_time))asyncdefmain():#ensure_future方法接收protocolProcess或future作为参数,作用是调度它们的执行timetasks=[asyncio.ensure_future(MyCoroutine(i))foriinrange(10)]#返回结果awaitasyncio.gather(*tasks)#eventlooploop=asyncio.get_event_loop()try:loop.run_until_complete(main())finally:loop.close()运行结果:coroutine:7,执行完成。耗时:1秒协程:9,执行完成。花费时间:1秒协程:5,执行完成。花费时间:2秒协程:0,执行完成。耗时:4秒协程:6,执行完成。花费时间:4秒协程:2,执行完成。耗时:4秒协程:3,执行完成。花费时间:4秒协程:8个,已完成。花费时间:4秒协程:4,执行完成。耗时:4秒协程:1,执行完成。耗时:4秒从输出结果可以看出两点:1.协程没有按顺序返回结果;2.批量运行任务所用时间与所有任务中时间最长的相同。回调importasyncioimportrequestsasyncdefrequest():url='https://www.baidu.com'status=requests.get(url)returnstatusdefcallback(task):print('Status:',task.result())coroutine=request()task=asyncio.ensure_future(coroutine)task.add_done_callback(callback)print('Task:',task)loop=asyncio.get_event_loop()loop.run_until_complete(task)print('Task:',任务)你也可以不使用回调:importasyncioimportrequestsasyncdefrequest():url='https://www.baidu.com'status=requests.get(url)returnstatuscoroutine=request()task=asyncio.ensure_future(coroutine)print('Task:',task)loop=asyncio.get_event_loop()loop.run_until_complete(task)print('Task:',task)print('TaskResult:',task.result())asyncio和gevent的关系gevent第一个是三方库,通过greenlets实现协程的基本思路是:当一个greenlet遇到IO操作时,自动切换到其他greenlets,等待IO操作完成,一个然后在适当的时候切换回去继续执行。asyncio是Python3.4引入的标准库。它内置了对异步IO的支持,无需第三方支持。asyncio的编程模型是一个消息循环。我们直接从asyncio模块中获取EventLoop的引用,然后将需要执行的协程丢到EventLoop中执行,从而实现了异步IO。这两个库都可以用于很多异步io操作,但是在不同的场景下它们的效率和易用性可能会有所不同。当然,这需要深入的测试和研究,普通场景下差别不大。asyncio和Flask首先使用flask写一个web服务器fromflaskimportFlaskimporttimeapp=Flask(__name__)@app.route('/')defindex():time.sleep(3)return'Hello!'if__name__=='__main__':app.run(threaded=True)这里run()方法增加了一个参数threaded,表示Flask开启了多线程模式,否则默认只有一个线程。如果不开启多线程模式,当同时遇到多个请求时,只能顺序处理,所以即使我们使用协程异步请求服务,也只能一个一个排队等待,瓶颈就会出现在服务器端。所以开启多线程模式是很有必要的。程序运行后会打开一个web,默认端口为5000,使用asyncio模块进行测试。importasyncioimportrequestsimporttimestart=time.time()asyncdefrequest():url='http://127.0.0.1:5000'print('Waitingfor',url)response=requests.get(url)print('获取responsefrom',url,'Result:',response.text)tasks=[asyncio.ensure_future(request())for_inrange(5)]loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))end=time.time()print('Costtime:',end-start)运行结果:等待http://127.0.0.1:5000从http://127.0.0.1:5000得到响应结果:您好!Waitingforhttp://127.0.0.1:5000从http://127.0.0.1:5000获取响应结果:Hello!Waitingforhttp://127.0.0.1:5000从http://127.0.0.1:5000获取响应结果:Hello!Waitingforhttp://127.0.0.1:5000从http://127.0.0.1:5000获取响应结果:Hello!Waitingforhttp://127.0.0.1:5000从http://127.0.0.1:5000得到responseResult:Hello!Costtime:15.0814049243927可以发现和普通请求没什么区别,还是顺序执行,耗时15秒,通常平均一个请求需要3秒,那异步处理呢?其实要实现异步处理,首先要有挂起的操作。当一个任务需要等待IO结果时,它可以暂停当前任务并执行其他任务,这样我们就可以充分利用资源。上面的方法都是严重串行下去的,连个hang都没有,怎么可能实现异步呢?实现异步,我们来看看await的用法。使用await可以暂停耗时的等待操作,放弃控制权。当执行协程遇到await时,时间循环会将协程挂起,转而执行其他协程,直到其他协程被挂起或执行完毕。修改后代码:importasyncioimportrequestsimporttimestart=time.time()#这里增加异常get#否则会报错:TypeError:objectResponsecan'tbeusedin'await'expressionasyncdefget(url):returnrequests.get(url)asyncdefrequest():url='http://127.0.0.1:5000'print('Waitingfor',url)response=awaitget(url)#修改这里增加awaitprint('Getresponsefrom',url,'Result:',response.text)tasks=[asyncio.ensure_future(request())for_inrange(5)]loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))end=time.time()print('Costtime:',end-start)运行结果:Waitingforhttp://127.0.0.1:5000从http://127.0.0.1:5000得到响应结果:Hello!Waitingforhttp:///127.0.0.1:5000Getresponsefromhttp://127.0.0.1:5000Result:Hello!Waitingforhttp://127.0.0.1:5000Getresponsefromhttp://127.0.0.1:5000结果:Hello!Waitingforhttp://127.0.0.1:5000从http://127.0.0.1:5000得到响应结果:你好!正在等待http://127.0.0.1:5000得到响应sefromhttp://127.0.0.1:5000结果:你好!花费时间:15.083374977111816还是不行,不是异步执行的,也就是说我们把涉及IO操作的代码封装到async修饰的方法中是不可行的!我们必须使用支持异步操作的请求方法才能实现真正的异步,所以这里就需要aiohttp派上用场了。aiohttp首先执行以下命令安装此模块:pipinstallaiohttp示例:importasyncioimportaiohttpimporttimestart=time.time()asyncdefget(url):session=aiohttp.ClientSession()response=awaitsession.get(url)result=awaitresponse.text()session.close()returnresultasyncdefrequest():url='http://127.0.0.1:5000'print('等待',url)result=awaitget(url)print('从',url,'结果:',结果)任务=[asyncio.ensure_future(request())for_inrange(5)]loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))end=time.time()print('Costtime:',end-start)运行结果:xxCosttime:3.056924819946289我们发现这个请求的耗时从15秒变成了3秒,并且时间——消耗直接变成了原来的1/5。这就是异步操作的便利之处。当遇到阻塞操作时,任务被挂起,程序再去执行其他任务,而不是傻傻地等待,这样就可以充分利用CPU时间,而不是浪费时间等待IO。在这里,任务数量设置为100,结果发现是3秒。花费时间:3.4409260749816895单进程和多进程对比,单进程代码:importrequestimporttimestart=time.time()defrequest():url='http://127.0.0.1:5000'print('等待',url)result=requests.get(url).textprint('从',url,'Result:',result)for_inrange(100):request()end=time.time()print('Costtime:',end-start)运行结果:Costtime:301.17162680625916Multiprocessingversion:importrequestsimporttimeimportmultiprocessingstart=time.time()defrequest(_):url='http://127.0.0.1:5000'print('Waitingfor',url)result=requests.get(url).textprint('Getresponsefrom',url,'Result:',result)if__name__=="__main__":#必须添加这一行在windows下执行,否则会报错.time()print('花费时间:',end-start)运行结果:Costtime:48.85933017730713如果想查看CPU数量,也可以使用:importpsutilpsutil.cpu_count()