在阅读本文之前,你应该已经了解了RabbitMQ的一些概念,比如队列和交换机。延迟队列介绍队列中的消息在延迟一段时间后被消费者消费。这样的队列可以称为延迟队列。延迟队列的应用场景非常广泛,比如:下单后30分钟内未付款取消订单;定时发送通知等。通过死信实现延迟队列通过在Golang中引入RabbitMQ的死信队列,我们??可以很方便的实现一个延迟队列。取消普通队列的消费者;发送消息时设置TTL;通过以上两点,普通队列的消息永远不会被消费,而是等待消息TTL过期,进入死信队列,让死信消费者消费。从而达到延迟队列的效果。上面的看起来没什么问题,但是实际测试过后,你会发现消息并不会“如期而亡”。当先产生一条TTL为60s的消息,再产生一条TTL为5s的消息时,第二条消息在5s后不会过期,进入死信队列。消息一起进入死信队列。这是因为RabbitMQ只判断队列中的第一条消息是否过期。通过插件实现延迟队列架构针对以上问题,自然有解决方案,通过RabbitMQ的rabbitmq_delayed_message_exchange插件解决。RabbitMQ及插件的安装本文不做赘述,可参考本文安装或使用Docker安装。这个插件的原理是将消息暂时存放在交换机的mnesia(一个分布式数据系统)表中,延迟投递到队列中,等到消息过期再投递到队列中。简单了解一下插件的原理后,我们就可以这样设计延迟队列了。实现producer的实现要点:1.在声明switch的时候,不再是direct类型,而是x-delayed-message类型,这是插件提供的类型;2.交换机要增加“x-delayed-type”:“direct”参数设置;3、发布消息时,在Headers中设置x-delay参数,控制消息从switch发出的过期时间;err=mqCh.Publish(constant.Exchange1,constant.RoutingKey1,false,false,amqp.Publishing{ContentType:"text/plain",Body:[]byte(message),//过期:"10000",//消息过期时间(消息级别),毫秒Headers:map[string]interface{}{"x-delay":"5000",//消息从交易所过期,单位毫秒(由x-dead-message插件提供)},})producer的完整代码://producter.gopackagemainimport("fmt""github.com/streadway/amqp""learn_gin/go/rabbitmq/delayletter/constant""learn_gin/go/rabbitmq/util""strconv""time")funcmain(){//#==========1.创建连接==========mq:=util.NewRabbitMQ()defermq。Close()mqCh:=mq.Channel//#==========2.设置队列(Queue,switch,binding)===========//声明队列变量错误错误_,err=mqCh.QueueDeclare(constant.Queue1,true,false,false,false,amqp.Table{//"x-message-ttl":60000,//消息过期时间(队列级别),毫秒})util.FailOnError(err,"创建队列失败")//声明交换//err=mqCh.ExchangeDeclare(Exchange1,amqp.ExchangeDirect,true,false,false,false,nil)err=mqCh.ExchangeDeclare(constant.Exchange1,"x-delayed-message",true,false,false,false,amqp.Table{"x-delayed-type":"direct",})util.FailOnError(err,"创建交换失败")//队列绑定(将队列、路由键和交换机绑定在一起)err=mqCh.QueueBind(constant.Queue1,constant.RoutingKey1,constant.Exchange1,false,nil)util.FailOnError(err,"queue,switch,routing-key绑定失败")//#===========4.发布消息===========message:="msg"+strconv.Itoa(int(time.Now().Unix()))fmt.Println(message)//发布消息err=mqCh.Publish(constant.Exchange1,constant.RoutingKey1,false,false,amqp.Publishing{ContentType:"text/plain",Body:[]byte(message),//Expiration:"10000",//消息过期时间(消息级别),毫秒Headers:map[string]interface{}{"x-delay":"5000",//来自交易所的消息过期时间,单位毫秒(由x-dead-message插件提供)},})util.FailOnError(err,"发布消息失败")}由于在生产者端建立了队列和开关,所以消费者不需要特别设置,直接附上代码即可完整的消费者代码://consumer.gopackagemainimport("learn_gin/go/rabbitmq/delayletter/constant""learn_gin/go/rabbitmq/util""log")funcmain(){//#==========1.创建连接==========mq:=util.NewRabbitMQ()defermq.Close()mqCh:=mq.Channel//#==========2.消费消息==========msgsCh,err:=mqCh.Consume(constant.Queue1,"",false,false,false,false,nil)util.FailOnError(err,"消费队列失败")forever:=make(chanbool)gofunc(){ford:=rangemsgsCh{//要实现的逻辑log.Printf("receivedmessage:%s",d.Body)//手动回答d.Ack(false)//d.Reject(true)}}()log.Printf("[*]等待消息,要退出按CTRL+C")<-永远}结束!源代码猴子先生/go-demo
