当前位置: 首页 > 后端技术 > Python

Python队列进阶:多生产者&单消费者问题

时间:2023-03-26 16:49:09 Python

producer-consumer模型在并发编程中,比如爬虫,有的线程负责爬取数据,有的线程负责对爬取到的数据进行处理(清洗、排序和排序)贮存)。如果直接交互,那么当两者的速度不匹配时,难免会出现等待的现象,也会造成资源的浪费。抽象是一种非常重要的通用能力,生产者-消费者模型是前人抽象出的一系列同类型具体问题的一致最优解。该模型具有三个重要的角色,容器、生产者和消费者。顾名思义,生产者负责生产数据或任务,消费者负责消费数据或任务(以下统称任务),而容器则是两者沟通的媒介。在这种模型中,生产者和消费者不再直接通信,而是通过引入第三方容器(通常使用阻塞队列)来实现解耦。这样生产者就不用因为消费者的速度太慢而等待,可以直接将任务放入容器中,而消费者也不用因为生产者的生产速度太慢而等待,直接获取从容器中取出任务,从而实现资源利用。最大利用率。使用这种模型可以解决并发编程中的大部分并发问题。简单版本让我们首先编写一个简单版本的生产者-消费者模型,其中包含一个生产者和一个消费者。导入线程导入时间导入队列defconsume(thread_name,q):whileTrue:time.sleep(2)product=q.get()print("%sconsume%s"%(thread_name,product))defproduce(thread_name,q):foriinrange(3):product='product-'+str(i)q.put(product)print("%sproduce%s"%(thread_name,product))time.sleep(1)q=queue.Queue()p=threading.Thread(target=produce,args=("producer",q))c=threading.Thread(target=consume,args=("consumer",q))p.start()c.start()p.join()#输出如下producer生产product-0producer生产product-1consumer消费product-0producer生产product-2consumer消费product-1consumer消费product-2...上面是最简单的producerconsumer模型中,生产者生产三个任务供消费者消费。但是上面的写法有个问题,就是生产者生产完任务后随主线程退出,但是消费者消费完所有任务后还没有停止,一直处于阻塞状态。whileTrue的判断可以改成whilenotq.empty()吗?肯定不行。因为empty()返回False,所以不能保证对get()的后续调用不会阻塞。同时,如果使用empty()函数进行判断,那么需要保证消费者线程启动时生产者至少已经生产了一个任务,否则消费者线程会直接退出程序因为条件不满足;同时,如果生产者的生产速度比较慢。一旦消费者消费完所有任务,下次判断时没有新任务进入队列,消费者线程也会因为不满足条件而直接退出程序。从此,生产者生产的任务将永远不会被消费。那么我们可以做一个约定,当producer生产完task后,打上一个flag,类似q.put(None),consumer一旦收到None的task,就意味着结束,直接退出程序即可。这种做法在上面的程序中是没有问题的。唯一的缺点就是如果有N个消费者线程,需要放置N个None标志,这显然对多消费者程序很不友好。最佳实践我们可以结合队列的内置函数task_done()和join()来实现我们的目标。join()函数正在阻塞。当消费者通过get()从队列中获取任务并完成处理时,需要调用并且只能调用一次task_done()。这个方法会向队列发送一个信号,join()函数正在监听这个信号。可以简单理解为队列内部维护了一个计数器,用来标识未完成任务的数量。每当添加任务时,计数器都会增加,而当调用task_done()时,计数器会减少,直到队列为空。而join()是监听队列是否为空,一旦满足就结束阻塞状态。importthreadingimporttimeimportqueuedefconsume(thread_name,q):whileTrue:time.sleep(2)product=q.get()print("%sconsume%s"%(thread_name,product))q.task_done()def生产(thread_name,q):foriinrange(3):product='product-'+str(i)q.put(product)打印("%sproduce%s"%(thread_name,product))time.sleep(1)q.join()q=queue.Queue()p=threading.Thread(target=produce,args=("producer",q))c=threading.Thread(target=consume,args=("consumer",q))c1=threading.Thread(target=consume,args=("consumer-1",q))c.setDaemon(True)c1.setDaemon(True)p.start()c.start()c1.start()p.join()#输出如下producerproduceproduct-0producerproduceproduct-1consumer-1consumeproduct-0consumerconsumerproduct-1producerproduceproduct-2consumerconsumerproduct-2在上面的例子中,我们设置了consumerthread作为Daemonthread,这样当主线程结束的时候,consumerthread也会结束。那么主线程最后一句p.join()的意思就是主线程必须等待生产者线程结束才能结束。仔细看一下生产者线程的主函数produce(),这里出现了我们上面提到的q.join()函数。而task_done是在消费者线程的main函数中调用的。所以,当生产者线程生产完所有的任务后,就会被阻塞,只有消费者线程处理完所有的任务,生产者才会被阻塞。随着生产者线程的结束,主线程也结束了,守护线程消费者线程也结束了,至此所有线程都安全退出了。队列小结本章介绍了队列的高级应用,从简单的例子到最佳实践,介绍了生产者-消费者模型的基本用法。在这个模型中,队列起到了非常重要的作用,起到了解耦的作用。这个模型有固定的步骤,其中最重要的是通过task_done()和join()相互通信。task_done()只是用来通知队列消费者一个任务已经完成,它并不关心这个任务是什么,它只关心队列中有多少个未完成的任务。注意:task_done()不能在put()之前调用,否则会报ValueError:task_done()calledtoomanytimes。同时这个函数只能在任务处理完后调用一次,否则队列将无法准确计算出未完成任务的数量。最近为初学者整理了数百G的Python学习资料,包括电子书、教程、源码等,免费分享给大家!想上gong~hao“Python编程学习圈”,发“J”免费领取