昨天的直播中,有同学问,如果使用asyncio+httpx实现并发请求,如何限制请求频率?如何限制最大请求数同时发送x个请求如何?我们今天给出两种选择。提问假设我们同时发起12个请求,每个请求耗时不同,那么总的请求时间和耗时最长的请求差不多。我们先写个例子来测试一下:importasyncioimporthttpximporttimeasyncdefreq(delay):print(f'请求一个延迟{delay}秒的接口')asyncwithhttpx.AsyncClient(timeout=20)asclient:resp=awaitclient.get(f'http://127.0.0.1:8000/sleep/{delay}')result=resp.json()print(result)asyncdefmain():start=time.time()delay_list=[3,6,1,8,2,4,5,2,7,3,9,8]task_list=[]fordelayindelay_list:task=asyncio.create_task(req(delay))task_list.append(task)awaitasyncio.gather(*task_list)end=time。time()print(f'totaltime-consumed:{end-start}')asyncio.run(main())这段代码,使用for循环创建了12个协程任务,这些任务几乎同时运行,所以request总完成所有接口耗时如下图所示:现在的问题是因为网站有反爬虫机制,最多同时发起3个请求。那么如何保证最多3个协程同时请求网络呢?第一种限制协程任务数的方案和前面限制多线程线程数的方案是一样的。我们创建一个列表,保证列表中最多有3个任务,然后继续循环检查。如果发现一个任务已经完成,我们就移除已完成的任务并添加一个新任务,直到要爬的列表为空。这个任务列表也是空的。代码如下:importasyncioimporthttpximporttimeasyncdefreq(delay):print(f'requestaninterfacewithadelayof{delay}seconds')asyncwithhttpx.AsyncClient(timeout=20)asclient:resp=awaitclient.get(f'http://127.0.0.1:8000/sleep/{delay}')result=resp.json()print(result)asyncdefmain():start=time.time()delay_list=[3,6,1,8,2,4,5,2,7,3,9,8]task_list=[]whileTrue:ifnotdelay_listandnottask_list:breakwhilelen(task_list)<3:ifdelay_list:delay=delay_list.pop()task=asyncio.create_task(req(delay))task_list.append(task)else:breaktask_list=[taskfortaskintask_listifnottask.done()]awaitasyncio.sleep(1)end=time.time()print(f'总耗时:{end-start}')asyncio.run(main())运行效果如下图所示:总共耗时约28秒。这比串行所需的58秒快一半,但比所有并发快两倍。使用Semaphoreaasyncio实际上自带了一个限制协程数量的类,叫做Semaphore。我们只需要初始化它,传入允许的最大协程数,然后它就可以被上下文管理器使用。我们看一下代码:importasyncioimporthttpximporttimeasyncdefreq(delay,sem):print(f'requestaninterfacewithdelay{delay}seconds')asyncwithsem:asyncwithhttpx.AsyncClient(timeout=20)asclient:resp=awaitclient.get(f'http://127.0.0.1:8000/sleep/{delay}')result=resp.json()print(result)asyncdefmain():start=time.time()delay_list=[3,6,1,8,2,4,5,2,7,3,9,8]task_list=[]sem=asyncio.Semaphore(3)fordelayindelay_list:task=asyncio.create_task(req(delay,sem))task_list.append(任务)awaitasyncio.gather(*task_list)end=time.time()print(f'总耗时:{end-start}')asyncio.run(main())运行效果如下图所示:时间-耗时22秒,比第一个A程序快。我们来看看Semaphore的用法。它的格式是:sem=asyncio.Semaphore(同时运行的协程数)asyncdeffunc(sem):asyncwithsem:这里是并发执行的代码task_list=[]for_inrange(需要执行的任务总数):task=asyncio.create_task(func(sem))task_list.append(task)awaitasyncio.gather(*task_list)当我们想限制一个协程的并发数时,我们可以在调用协程Semaphore对象之前初始化一个协程。然后把这个对象传给需要限制并发的协程。在协程中,使用一个异步上下文管理器来包装你的正式代码:asyncwithsem:formalcode这样,如果并发数没有达到限制,那么asyncwithsem就会瞬间执行,进入正式代码里面。如果并发数达到限制,其他协程将被阻塞在sem中async,直到一个正在运行的协程完成退出,并释放一个新的协程来替代已经完成的协程。这种写法其实和多线程加锁很像。只不过锁是为了保证同一时刻只有一个线程在运行,而Semaphore可以人为指定同一时刻可以运行多少个协程。如何限制1分钟内可以运行的协程数可能同学看了上面的例子,只知道如何限制同时运行的协程数。但是如何限制一段时间内同时运行的协程数量呢?其实很简单,在并发协程中加上asyncio.sleep即可。比如上面的例子,我想限制每分钟只有3个协程,那么我可以把代码改成:asyncdefreq(delay,sem):print(f'requestaninterfacewithadelayof{delay}seconds')asyncwithsem:asyncwithhttpx.AsyncClient(timeout=20)asclient:resp=awaitclient.get(f'http://127.0.0.1:8000/sleep/{delay}')result=resp.json()print(result)awaitasyncio。sleep(60)总结如果想限制协程的并发数,最简单的方法就是使用asyncio.Semaphore。但需要注意的是,它只能在启动协程之前进行初始化,然后传递给协程。确保所有并发协程都获得相同的信号量对象。当然,你的程序中可能有很多不同的部分,有的部分限制并发数为a,有的部分限制并发数为b。然后你可以初始化多个Semaphore对象并将它们传递给不同的协程。本文转载自微信公众号“闻所未闻的密码”,可通过以下二维码关注。转载本文请联系Code公众号。
