生产者消费者模型介绍1.为什么要使用生产者消费者模型Producer是指生产数据的任务,consumer是指消费数据的任务。当生产者的生产能力远大于消费者的消费能力时,生产者需要等待消费者消费完毕,才能继续生产新的数据。同理,如果消费者的消费能力远大于生产者的生产能力,那么消费者会不断产生新的数据。需要等待生产者生产完数据才能继续消费。这种等待会导致效率低下。为了解决这个问题,引入了生产者消费者模型。2.如何实现生产者消费者模型。生产者消费者模型可以通过在进程之间引入队列来实现。使用队列,不需要考虑锁的概念,因为进程之间的通信是通过队列实现的;生产者产生的数据被发送到队列。write,consumer消费数据直接从队列中取出,实现了生产者和消费者的解耦。Producer-->Queue<--ConsumerQueue实现生产者-消费者模型1,consumer-producer模型代码frommultiprocessingimportProcess,Queueimporttime#consumermethoddefconsumer(q,name):whileTrue:res=q.get()#如果res为None:breakprint("%sate%s"%(name,res))#producer方法defproducer(q,name,food):foriinrange(3):time.sleep(1)#模拟西瓜生产的时延res="%s%s"%(food,i)print("%sproduced%s"%(name,res))#puttheproducedvegetableEnterthequeueq.put(res)if__name__=="__main__":#创建队列q=Queue()#创建生产者p1=Process(target=producer,args=(q,"kelly","watermelon"))c1=Process(target=consumer,args=(q,"peter",))p1.start()c1.start()#p1.join()#q.put(None)print("主进程")2.执行结果2.1直接执行上面代码的结果会出现一个问题,生产者已经生产完了,还没有给消费者发出停止信号,所以消费者会一直阻塞在q.get()中,导致程序无法退出。为了解决上述问题,让消费者在消费完生产者的数据后自动退出,需要在引入生产者进程的时候,在队列中放入一个结束信号,消费者收到这个信号后就会退出消费进程。主要修改了两个地方。打开下面代码的注释,实现消费者在收到生产者的结束信号后,会退出消费进程。defconsumer():ifresisNone:breakif__name__=="__main__":p1.join()q.put(None)2.2开启注释后,消费者收到生产者发送的消息结束信号,即可退出程序正常。但是如果有n个消费者,就需要发送n个结束信号。这个方法就没那么简洁了,像下面的代码:frommultiprocessingimportProcess,Queueimporttime#Consumermethoddefconsumer(q,name):whileTrue:res=q.get()ifresisNone:breakprint("%sate%s"%(name,res))#生产者方法defproducer(q,name,food):foriinrange(3):time.sleep(1)#模拟西瓜生产的延时res="%s%s"%(food,i)print("%sproduced%s"%(name,res))#Puttheproduction将蔬菜放入队列q.put(res)if__name__=="__main__":#创建队列q=Queue()#创建生产者p1=Process(target=producer,args=(q,"kelly","Watermelon"))p2=Process(target=producer,args=(q,"kelly2","banana"))c1=Process(target=consumer,args=(q,"peter",))c2=Process(target=consumer,args=(q,"peter2",))c3=Process(target=consumer,args=(q,"peter3",))p1.start()p2.start()c1.start()c2.start()c3.start()p1.join()p2.join()q.put(None)q.put(None)q.put(None)print("mainprocess")其实我们现在就是producers了产生了,我想向队列发送一个结束信号。python语言提供了另外一个队列JoinableQueue([maxsize])来解决这个问题。JoinableQueue实现了生产者-消费者模型1.JoinableQueue方法介绍JoinableQueue([maxsize]):一种同时支持join()和task_done()方法的队列类型q.task_done():消费者使用该方法发送信号,表示q.get()返回已经处理过的项目。q.join():生产者调用该方法阻塞,直到处理完队列中的所有项;阻塞将继续,直到为队列中的每个项目调用q.task_done()方法。2、JoinableQueue实现生产者消费者模型源码frommultiprocessingimportProcess,JoinableQueueimporttime#consumermethoddefconsumer(q,name):whileTrue:res=q.get()ifresisNone:breakprint("%seat%s"%(name,res))q.task_done()#向q.join()发送一个信号,表示已经从队列中取出一个值并进行处理#Producer方法defproducer(q,name,food):foriinrange(3):time.sleep(1)#模拟西瓜生产的时间延迟res="%s%s"%(food,i)print("%sproduced%s"%(name,res))#将生产的蔬菜放入队列q.put(res)q.join()#等待消费者获取到他放入队列的所有元素后结束if__name__=="__main__":#q=Queue()q=JoinableQueue()#创建生产者p1=Process(target=producer,args=(q,"kelly","watermelon"))p2=Process(target=producer,args=(q,"kelly2","blueberry"))#创建消费者c1=Process(target=consumer,args=(q,"peter",))c2=Process(target=consumer,args=(q,"peter2",))c3=Process(target=consumer,args=(q,"peter3",))c1.守护进程=Truec2。守护进程=Truec3。daemon=Truep_l=[p1,p2,c1,c2,c3]forpinp_l:p.start()p1.join()p2.join()#1.主进程等待p1,p2进程结束在继续执行之前#2.由于q.join()的存在,生产者只能等待队列中的元素被消费后才会结束#3.当生产者结束时,它会表示消费者已经消费完毕,可以结束了,所以可以将消费者设置为守护进程(随着主进程的退出而退出)print("主进程")3.可以看到运行结果从运行结果来看,production作者并没有手动给consumer发送结束信号,而是通过JoinableQueue队列实现了producer-consumer模型,仅供学习,入侵删除。学习Python的路上肯定会遇到困难,不要慌张,我这里有一套学习资料,包括40+电子书,800+教学视频,涉及Python基础、爬虫、框架、数据分析、机学习等等,别怕你学不会!https://shimo.im/docs/JWCghr8...《Python学习资料》关注公众号【蟒圈】,每日优质文章推送。
