什么是生产者消费者模型?这里的模块可能是:函数、线程??、进程等)。生成数据的模块称为生产者,而处理数据的模块称为消费者。生产者和消费者之间的缓冲区称为仓库。生产者负责将货物运送到仓库,消费者负责从仓库中取出货物,这样就构成了生产者-消费者模型。结构图如下:为了便于理解,我们举一个发送信件的例子。假设你要发一封信,一般流程是这样的:你写信——相当于生产者生产数据,你把信放进邮箱——相当于生产者把数据放入缓冲区。邮递员从邮箱中取出信件。做相应的处理——相当于consumer从buffer中取出数据,处理数据producer-consumer模式的优势。解耦假设生产者和消费者分别是两个线程。如果允许生产者直接调用消费者的某个方法,那么生产者就会对消费者产生依赖(即耦合)。如果以后消费者的代码发生变化,可能会影响生产者的代码。而如果两者都依赖于某个buffer,两者之间没有直接依赖,那么耦合度也会相应降低。例如,我们去邮局送信。如果不用邮箱(也就是缓冲区),就得直接把信交给邮递员。有的同学会说,直接把信寄给邮递员不是很简单吗?这其实并不容易。你必须先知道邮递员是谁,然后才能把信递给他。这会在你和postman之间创建依赖关系(相当于生产者和消费者的强耦合)。万一哪天邮递员换了,你还得重新认识(相当于消费者换了导致生产者代码修改)。邮箱比较固定,依赖成本比较低(相当于和buffer弱耦合)。并发性因为生产者和消费者是两个独立的并发体,他们之间通过缓冲区进行通信。生产者只需要将数据扔进缓冲区就可以继续生产下一个数据,而消费者只需要从缓冲区开始取数据就可以了,这样就不会因为彼此的处理速度而阻塞。继续上面的例子,如果我们不用邮箱,就得在邮局等邮递员回来,把信交给他,这期间我们什么也做不了(也就是生产者被封锁)。或者邮递员必须挨家挨户地询问谁在寄信(相当于消费者投票)。支持不均匀忙碌。当生产者快速生产数据时,消费者没有时间处理它。未处理的数据可以暂存在缓冲区中,慢慢处理。不会因为消费者的性能造成数据丢失或影响生产者的生产。我们再举个寄信的例子。假设邮递员一次只能带走1000封信。如果在情人节(或圣诞节)寄贺卡,需要寄1000多封信。这时候,邮箱缓冲区就派上用场了。邮递员把来不及带走的信件暂时存放在信箱里,下次来的时候再带走。通过上面的介绍,大家应该已经了解了生产者消费者模型。Python中的多线程编程在实现生产者消费者模型之前,我们先了解一下Python中的多线程编程。线程是操作系统直接支持的执行单元。高级语言通常都内置了多线程支持,Python也不例外。Python线程是真正的Posix线程,而不是模拟线程。Python的标准库提供了两个模块:_thread和threading。_thread是低级模块,threading是封装了_thread的高级模块。大多数情况下,我们只需要使用高级模块threading。我们先看一段用Python实现多线程的代码。importtime,threading#线程代码getName())foriinrange(6):print('thread%s>>>%s'%(self.getName(),i))time.sleep(1)print('thread%sfinished.'%self.getName())taskthread=TaskThread('TaskThread')taskthread.start()taskthread.join()下面是程序的执行结果:threadTaskThread正在运行...threadTaskThread>>>0threadTaskThread>>>1threadTaskThread>>2threadTaskThread>>>3threadTaskThread>>>4threadTaskThread>>>5threadTaskThreadfinished。TaskThread类继承自threading模块中的Thread线程类。构造函数的name参数指定了线程的名称,具体的任务是通过重载基类run函数来实现的。在简单了解了Python线程之后,我们来实现一个生产者-消费者模型。fromQueueimportQueueimportrandom,threading,time#生产者类classProducer(threading.Thread):def__init__(self,name,queue):threading.Thread.__init__(self,name=name)self.data=queuedefrun(self):foriinrange(5):print("%siproducing%dtothequeue!"%(self.getName(),i))self.data.put(i)time.sleep(random.randrange(10)/5)print("%sfinished!"%self.getName())#消费者类classConsumer(threading.Thread):def__init__(self,name,queue):threading.Thread.__init__(self,name=name)self.data=queuedefrun(self):foriinrange(5):val=self.data.get()print("%sisconsuming.%dinthequeueisconsumed!"%(self.getName(),val))time.sleep(random.randrange(10))print("%sfinished!"%self.getName())defmain():queue=Queue()producer=Producer('Producer',queue)consumer=Consumer('Consumer',queue)producer.start()consumer.start()producer.join()consumer.join()print'Allthreadsfinished!'if__name__=='__main__':main()执行结果可能如下:Producerisproducing0tothequeue!Consumerisconsuming.0inthequeueisconsumed!Producerisproducing1tothequeue!Producerisproducing2tothequeue!Consumerisconsuming.1inthequeueisconsumed!Consumerisconsuming.2inthequeueisconsumed!Producerisproducing3tothequeue!Producerisproducing4tothequeue!Producerfinished!Consumerisconsuming.3inthequeueisconsumed!Consumerisconsuming.4inthequeueisconsumed!Consumerfinished!Allthreadsfinished!因为多线程是抢占式执行的,所以打印出的运行结果不一定和以上是完全一致的总结本例通过Python实现了一个简单的生产者消费者模型。Python中的Queue模块已经提供了对线程同步的支持,因此本文不涉及锁、同步、死锁等多线程问题。
