什么是生产者消费者模型?这里的模块可能是:函数、线程??、进程等)。生成数据的模块称为生产者,而处理数据的模块称为消费者。生产者和消费者之间的缓冲区称为仓库。生产者负责将货物运送到仓库,消费者负责从仓库中取出货物,这样就构成了生产者-消费者模型。结构图如下:为了便于理解,我们举一个发送信件的例子。假设你要寄一封信,一般流程如下: 1。你写信——相当于生产者生产数据 2。你把信放入邮箱——相当于生产者把数据放入缓冲区 3。邮递员从邮箱中取出信件并进行相应处理——相当于消费者从缓冲区中取出数据并处理数据。生产者-消费者模型的优点是解耦。假设生产者和消费者是两个线程。如果允许生产者直接调用消费者的某个方法,那么生产者就会对消费者产生依赖(即耦合)。如果以后消费者的代码发生变化,可能会影响生产者的代码。而如果两者都依赖于某个buffer,两者之间没有直接依赖,那么耦合度也会相应降低。例如,我们去邮局送信。如果不用邮箱(也就是缓冲区),就得直接把信交给邮递员。有的同学会说,直接给邮递员不是很简单吗?其实不简单,你要先知道邮递员是谁,才能把信投递给他。这会在你和postman之间创建依赖关系(相当于生产者和消费者的强耦合)。万一哪天邮递员换了,还得重新认识(相当于换消费者代码,修改生产者代码)。邮箱比较固定,依赖成本比较低(相当于和buffer弱耦合)。并发性因为生产者和消费者是两个独立的并发体,他们之间通过缓冲区进行通信。生产者只需要将数据扔进缓冲区就可以继续生产下一个数据,而消费者只需要从缓冲区开始取数据就可以了,这样就不会因为彼此的处理速度而阻塞。继续上面的例子,如果我们不用邮箱,就得在邮局等邮递员回来,把信交给他,这期间我们什么也做不了(也就是,生产者被封锁)。或者邮递员必须挨家挨户地询问谁在寄信(相当于消费者投票)。支持不均匀忙碌。当生产者快速生产数据时,消费者没有时间处理它。未处理的数据可以暂存在缓冲区中,慢慢处理。不会因为消费者的性能造成数据丢失或影响生产者的生产。我们再举个寄信的例子。假设邮递员一次只能带走1000封信。如果在情人节(或圣诞节)寄贺卡,需要寄1000多封信。这时候,邮箱缓冲区就派上用场了。邮递员把来不及带走的信件暂时存放在信箱里,下次来的时候再带走。通过上面的介绍,大家应该已经了解了生产者消费者模型。Python中的多线程编程在实现生产者消费者模型之前,我们先了解一下Python中的多线程编程。线程是操作系统直接支持的执行单元。高级语言通常都内置了多线程支持,Python也不例外。Python线程是真正的Posix线程,而不是模拟线程。Python的标准库提供了两个模块:_thread和threading。_thread是低级模块,threading是封装了_thread的高级模块。大多数情况下,我们只需要使用高级模块threading。我们先看一段用Python实现多线程的代码。importtime,threading#threadcodeclassTaskThread(threading.Thread):def__init__(self,name):threading.Thread.__init__(self,name=name)defrun(self):print('线程%s正在运行。..'%self.getName())foriinrange(6):print('thread%s>>>%s'%(self.getName(),i))time.sleep(1)print('thread%sfinished.'%self.getName())taskthread=TaskThread('TaskThread')taskthread.start()taskthread.join()下面是程序的执行结果:threadTaskThreadisrunning...threadTaskThread>>>0threadTaskThread>>>1threadTaskThread>>>2threadTaskThread>>>3threadTaskThread>>>4threadTaskThread>>>5threadTaskThread完成。TaskThread类继承自threading模块中的Thread线程类。构造函数的name参数指定了线程的名称,具体的任务是通过重载基类run函数来实现的。简单了解了Python的线程之后,我们来实现一个生产者消费者模型。fromQueueimportQueueimportrandom,threading,time#生产者类classProducer(threading.Thread):def__init__(self,name,queue):threading.Thread.__init__(self,name=name)self.data=queuedefrun(self):foriinrange(5):print("%sisproducing%dtothequeue!"%(self.getName(),i))self.data.put(i)time.sleep(random.randrange(10)/5)print("%sfinished!"%self.getName())#消费者类Consumer(threading.Thread):def__init__(self,name,queue):threading.Thread.__init__(self,name=name)self.data=queuedefrun(self):foriinrange(5):val=self.data.get()print("%sisconsuming.%dinthequeueisconsumed!"%(self.getName(),val))time.sleep(random.randrange(10))print("%s完成了!"%self.getName())defmain():queue=Queue()producer=生产者('生产者',队列)消费者=消费者('消费者',队列)生产者.start()consumer.start()producer.join()consumer.join()print'Allthreadsfinished!'if__name__=='__main__':main()执行结果可能如下:Producerisproducing0tothe队列!消费者在消费。队列中0被消耗!生产者正在生产1到队列中!生产者正在生产2到队列中!消费者在消费。队列中的1已被消耗!消费者在消费。队列中的2个被消耗了!正在向队列中生产3个!生产者正在向队列中生产4个!生产者完成了!消费者正在消费。队列中的3个被消费了!Consumer正在消费。队列中的4被消费了!消费完成!所有线程都完成了!由于多线程是抢占式执行的,所以打印出来的运行结果可能和上面不完全一样。小结本例通过Python实现了一个简单的生产者消费者模型。Python中的Queue模块已经提供了对线程同步的支持,因此本文不涉及锁、同步、死锁等多线程问题。
