当前位置: 首页 > 后端技术 > Python

用Go写一个简单的消息队列(二):客户端处理和通道设计

时间:2023-03-26 01:09:40 Python

上一篇我们定义了消息体和基本工具。在这篇文章中,我们开始研究客户端的处理功能和通道的基本设计。客户端处理函数这里所谓的客户端就是指消费者,处理函数就是处理消费者与我们服务之间的tcp连接。我们定义一个结构体Client,其中包含连接和状态字段,然后编写读写状态和tcp连接的相关函数。client.gopackageserverimport("encoding/binary""io""log")typeClientstruct{connio.ReadWriteClosernamestringstateint}funcNewClient(connio.ReadWriteCloser,namestring)*Client{return&Client{conn,名称,-1}}func(c*Client)String()string{returnc.name}func(c*Client)GetState()int{returnc.state}func(c*Client)SetState(stateint){c.state=state}func(c*Client)Read(data[]byte)(int,error){returnc.conn.Read(data)}func(c*Client)Write(data[]byte)(int,error){varerrerrorerr=binary.Write(c.conn,binary.BigEndian,int32(len(data)))iferr!=nil{返回0,err}n,err:=c.conn.Write(data)iferr!=nil{return0,err}returnn+4,nil}func(c*Client)Close(){log.Printf("CLIENT(%s):closing",c.String())复制代码c.conn.Close()}这里的总编辑比较简单,唯一值得一提的是写方法。在向消费者写入消息之前,我们先将消息体的长度写入连接,固定为4字节,让客户端先读取长度,再根据长度读取消息。channel从上一篇文章我们可以知道,channel是我们消息队列中最核心的数据结构之一,所以它的设计尤为重要。维护消费者信息首先,因为我们的消费者是从通道中读取消息的,所以通道需要维护消费者信息,可以增加消费者,也可以删除消费者。因此,我们先在channel结构体中维护一个consumer数组和两个channel,用于接收增删consumer的消息:]Consumer}func(c*Channel)AddClient(clientConsumer){log.Printf("Channel(%s):addingclient...",c.name)doneChan:=make(chaninterface{})c.addClientChan<-util.ChanReq{变量:客户端,RetChan:doneChan,}<-doneChan}func(c*Channel)RemoveClient(客户端消费者){log.Printf(“Channel(%s):删除客户端...”,c.name)doneChan:=make(chaninterface{})c.removeClientChan<-util.ChanReq{Variable:client,RetChan:doneChan,}<-doneChan}值得注意的是这里我们并没有直接绑定上面的Client结构,而是抽象出一个Consumer接口。这具有反转依赖关系和避免包循环引用的优点。现在我们有了接收消息的管道,我们需要一个goroutine驻留在后台处理这些消息,可以称之为事件处理循环,它是for+select的组合://RouterhandlestheeventsofChannelfunc(c*Channel)Router(){varclientRequtil.ChanReqfor{select{caseclientReq=<-c.addClientChan:client:=clientReq.Variable.(Consumer)c.clients=append(c.clients,client)日志。Printf("CHANNEL(%s)添加了客户端%#v",c.name,client)clientReq.RetChan<-struct{}{}caseclientReq=<-c.removeClientChan:client:=clientReq.Variable.(Consumer)indexToRemove:=-1fork,v:=rangec.clients{ifv==client{indexToRemove=kbreak}}ifindexToRemove==-1{log.Printf("错误:找不到客户端(%#v)inclients(%#v)",client,c.clients)}else{c.clients=append(c.clients[:indexToRemove],c.clients[indexToRemove+1:]...)log.Printf("CHANNEL(%s)删除了客户端%#v",c.name,client)}clientReq.RetChan<-struct{}{}}}}发送和接收消息对于发送和接收消息,这里我们使用三个通道来实现:后面会添加持久化到磁盘的功能)incomingMessageChan:用于接收生产者的消息clientMessageChan:消息会发送到这个通道,然后消费者拉取代码如下:typeChannelstruct{...incomingMessageChanchan*MessagemsgChanchan*MessageclientMessageChanchan*Message}func(c*Channel)PutMessage(msg*Message){c.incomingMessageChan<-msg}func(c*Channel)PullMessage()*Message{返回<-c.clientMessageChan}func(c*Channel)Router(){varclientRequtil.ChanReqgoc.MessagePump()for{select{...casemsg:=<-c.incomingMessageChan://防止msgChan缓冲区满时阻塞,增加一个默认分支直接丢弃消息select{casec.msgChan<-msg:log.Printf("CHANNEL(%s)wrotemessage",c.name)default:}}}}//MessagePump发送消息到ClientMessageChanfunc(c*Channel)MessagePump(){varmsg*Messagefor{select{casemsg=<-c.msgChan:}c.clientMessageChan<-msg}}Close当通道关闭时,我们需要做一些清理工作。首先,我们添加一个接收关闭信号的通道,当MessagePump协程和消费者连接接收到信号时关闭发送消息的通道,代码如下:typeChannelstruct{...exitChanchanutil.ChanReq}func(c*Channel)Router(){var(...closeChan=make(chanstruct{}))goc.MessagePump(closeChan)for{select{...casecloseReq:=<-c.exitChan:log.Printf("CHANNEL(%s)isclosing",c.name)close(closeChan)for_,consumer:=rangec.clients{consumer.Close()}closeReq.RetChan<-nil}}}//MessagePump发信息给ClientMessageChanfunc(c*Channel)MessagePump(closeChanchanstruct{}){varmsg*Messagefor{select{...case<-closeChan:return}...}}func(c*Channel)Close()错误{errChan:=make(chaninterface{})c.exitChan<-util.ChanReq{RetChan:errChan,}err,_:=(<-errChan).(error)returnerr}我们在事件处理循环中初始化一个管道,并将其作为参数传递给MessagePump协程。当收到关闭信号时,关闭管道,然后依次关闭消费者连接。关闭逻辑结束。频道完整代码如下:channel.go项目地址:https://github.com/yhao1206/SMQ相关阅读:用Go写一个简单的消息队列(一):定义消息和基本工具写一个简单的消息queuewithGo(三):添加完成确认和重入队列函数用Go写一个简单的消息队列(四):主题设计用Go写一个简单的消息队列(五):协议和后台队列实现用Go写一个简单的消息队列(6):服务器实现

最新推荐
猜你喜欢