当前位置: 首页 > 科技观察

生产者消费者模型和Golang实现

时间:2023-03-18 12:30:29 科技观察

本文介绍了生产者消费者模型和go实现的简单demo。1.生产者消费者模型生产者消费者模型:某个模块(函数等)负责产生数据,由另一个模块处理(这里的模块是广义的,可以是类,函数,也可以是协程,线程、过程等)。产生数据的模块称为生产者;处理数据的模块称为消费者。简单地将生产者和消费者抽象出来,不足以成为生产者消费者模型。这种模式还需要生产者和消费者之间有一个缓冲作为中介。生产者将数据放入缓冲区,消费者从缓冲区中取出数据。大致结构如下图所示。图片假设你要发快递,一般流程如下:1.封好快递——相当于厂家的生产资料。2.将快递送到快递中心——相当于生产者将数据放入缓冲区。3.邮递员发送的快递是从快递中心取出来的——相当于消费者从缓冲区中取出数据。这样,拥有缓冲区有以下好处:解耦:减少消费者和生产者之间的耦合。有了快递中心,就不用直接把快递交给邮递员了。寄快递的人对邮递员没有任何依赖。如果某天邮递员有变动,不会影响寄快递的人。假设生产者和消费者是两个类。如果允许生产者直接调用消费者的某个方法,那么生产者就会对消费者产生依赖(即耦合)。如果以后消费者的代码发生变化,可能会直接影响到生产者。而如果两者都依赖于某个缓冲区,并且两者之间没有直接依赖,那么耦合度就会相应降低。并发:生产者和消费者的数量不相等,仍能保持正常通信。由于函数调用是同步的(或者叫阻塞的),生产者不得不在那里等到消费者的方法没有返回。万一消费者处理数据慢了,生产者就只能等着浪费时间了。使用生产者消费者模式后,生产者和消费者可以是两个独立的并发主体。一旦生产者将制造的数据扔进缓冲区,它就可以生产下一个数据。基本上不需要依赖消费者的处理速度。人们只是把快递扔在快递中心,并不在意。缓存:生产者和消费者速度不匹配,暂存数据。如果发快递的人想一次发多个快递,那么邮递员发不了,就可以发其他快递,快递暂时存放在快递中心。即生产者在短时间内生产数据的速度太快,消费者来不及消费,可以将未处理的数据暂时存放在缓冲区中。2、Go语言实现单向通道最典型的应用就是“生产者消费者模型”。通道分为缓冲通道和非缓冲通道。传递通道中的参数时,它们作为引用传递。1.Unbufferedchannel示例代码1实现如下,data)out<-data//将数据写入缓冲区}close(out)//写入后关闭管道}funcconsumer(in<-chanint){//同时读取管道//for{//val,ok:=<-in//ifok{//fmt.Println("Consumergotdata:",data)//}else{//fmt.Println("Nodata")//break//}//}//不需要同步机制,先做后做//如果没有数据,会阻塞等待像隐式类型转换,双向管道到单向管道的转换ch:=make(chanint)//unbufferedchannelgoproducer(ch)//subgoprocessasproducerconsumer(ch)//maingoprocessasconsumer}这里使用无缓冲的channel,producer产生一次数据放入channel,然后consumer从中读取数据这个频道。如果没有数据,就只能等待,也就是阻塞,直到通道关闭。所以宏就是生产者和消费者同步执行。另外:这里是开辟一个go进程执行producer,maingo进程执行consumer。如果使用新的go进程执行consumer,需要在main函数中阻塞go进程,否则不会等待consumer和producer执行完毕,maingoroutine退出,程序直接结束,如示例代码3所示,producer每次生产,consumer只能获取一次数据,buffer作用不大。结果如下:2.缓冲通道示例代码2如下:packagemainimport"fmt"funcproducer(outchan<-int){fori:=0;i<10;i++{data:=i*ifmt.Println("producer生产数据:",data)out<-data//将数据写入缓冲区}close(out)//写入后关闭管道}funcconsumer(in<-chanint){//不需要同步机制,先做再做//没有数据阻塞等fordata:=rangein{fmt.Println("Consumergetsdata:",data)}}funcmain(){//传参时显式类型类似于隐式类型转换,将双向流水线转换为单向流水线ch:=make(chanint,5)//addbuffer,5goproducer(ch)//subgoprocessasproducerconsumer(ch)//maingoprocessasconsumer}有bufferchannel,只修改ch:=make(chanint,5)//加入buffer语句,只要buffer没有满,producer可以继续往bufferchannel中放入数据,只要缓冲区不为空,消费者就可以继续读取来自通道的数据。有异步和并发的特点。结果如下:这里图中的终端生产者之所以不断打印大于缓冲区容量的数据,是因为终端打印是系统调用,存在延时。IO操作过程中,producer同时向pipeline写入,请求打印,pipelineWriteread与终端输出打印速度不匹配。3、实际应用在实际应用中,同一个公共区域被同时访问,同时进行不同的操作。两者都可以分为生产者-消费者模型,比如订单系统。许多用户的订单下单后,会被放入缓冲区或队列中,然后系统从缓冲区中读取,进行真正的处理。系统不需要开辟多个线程来处理多个订单,减轻了系统并发的负担。通过生产者-消费者模型,订单系统与仓库管理系统隔离,用户可以随时下单(生产数据)。如果订单系统直接调用仓库系统,那么用户点击按钮下单后,要等到仓库系统返回结果。这将非常缓慢。即:用户成为生产者,加工订单管理系统成为消费者。代码示例三如下packagemainimport("fmt""time")//模拟订单对象typeOrderInfostruct{idint}//生产订单--生产者funcproducerOrder(outchan<-OrderInfo){//业务生成订单fori:=0;i<10;i++{order:=OrderInfo{id:i+1}fmt.Println("生成订单,订单ID为:",order.id)out<-order//写入通道}//如果没有关闭,消费reader会一直阻塞,等待读取close(out)//订单生成,关闭通道}//处理订单--consumerfuncconsumerOrder(in<-chanOrderInfo){//从通道,并处理fororder:=rangein{fmt.Println("读取订单,订单ID为:",order.id)}}funcmain(){ch:=make(chanOrderInfo,5)goproducerOrder(ch)goconsumerOrder(ch)time.Sleep(time.Second*2)}这里的逻辑和上面类似,不同的是使用了一个OrderInfo结构体来模拟订单作为业务处理对象。主线程用time.Sleep(time.Second*2)阻塞,否则程序立即停止。结果如下: