在阅读本文之前,你应该已经了解了RabbitMQ的一些概念,比如队列和交换机。死信的概念一般来说,不能正常消费的消息都可以称为死信。我们将其放入死信队列中,单独处理这部分“异常”消息。当一条消息满足以下条件之一时,就称为死信。消息被拒绝,不会放回队列(使用basic.reject/basic.nack方法拒绝消息,这两个方法的参数requeue=false)消息TTL过期,队列达到最大值length应用场景:当消费者无法正常消费消息或消息异常时,为了保证数据不丢失,将异常消息设置为死信,放入死信队列。死信队列中的消息会启动单独的消费者程序进行特殊处理。架构图:下面按照架构图来实现代码。Producer生产者一般只需要做两件事,一是创建链接,二是发送消息。RabbitMQ中涉及的队列、交换机和路由键都需要在代码中创建。这些操作可以由生产者和消费者创建。有关谁创建它们的讨论,请参阅RabbitMq:谁创建队列和交换?本文。在本文中,队列、交换机和路由键是在生产者端实现的。所以生产者需要一共做这些事情。创建连接建立队列(queue,switch,binding)建立死信队列(queue,switch,binding)发布消息创建连接使用streadway/amqp包与RabbitMQ建立连接。funcmain(){mq:=util.NewRabbitMQ()defermq.Close()mqCh:=mq.Channel...}...//util.NewRabbitMQ()funcNewRabbitMQ()*RabbitMQ{conn,err:=amqp.Dial(constant.MqUrl)FailOnError(err,"无法连接到RabbitMQ")ch,err:=conn.Channel()FailOnError(err,"无法打开通道")return&RabbitMQ{Conn:conn,Channel:ch,}}设置队列(queue,switch,binding)核心操作是设置队列阶段。声明一个普通队列,指定一个死信开关,并指定一个死信路由键。后续死信队列创建完成后,会绑定到死信交换机和指定的死信routing-key。varerrerror_,err=mqCh.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{"x-message-ttl":5000,//消息过期时间,毫秒"x-dead-letter-exchange":constant.DeadExchange,//指定死信交换"x-dead-letter-routing-key":constant.DeadRoutingKey,//指定死信routing-key})util.FailOnError(err,"创建法线失败queue")Declareexchangeerr=mqCh.ExchangeDeclare(constant.NormalExchange,amqp.ExchangeDirect,true,false,false,false,nil)util.FailOnError(err,"创建正常exchange失败")目前正常队列和exchange已经创建,但它们都是独立存在的,没有关联。通过QueueBind将queue、routing-key、switch绑定在一起。err=mqCh.QueueBind(constant.NormalQueue,constant.NormalRoutingKey,constant.NormalExchange,false,nil)util.FailOnError(err,"normal:Queue,exchange,routing-keybindingfailed")设置死信队列(queue,exchange,binding)同样的死信队列也需要创建队列,创建exchange,bind。//声明一个死信队列//args为nil。死信队列切记不要设置消息过期时间,否则无效消息进入死信队列后会再次过期。_,err=mqCh.QueueDeclare(constant.DeadQueue,true,false,false,false,nil)util.FailOnError(err,"创建死队列失败")//声明开关err=mqCh.ExchangeDeclare(constant.DeadExchange,amqp.ExchangeDirect,true,false,false,false,nil)util.FailOnError(err,"Failedtocreateadeadqueue")//队列绑定(将队列、routing-key、switch绑定在一起)err=mqCh.QueueBind(constant.DeadQueue,constant.DeadRoutingKey,constant.DeadExchange,false,nil)util.FailOnError(err,"dead:Queue,switch,routing-keybindingfailed")当死信队列建立后,正常队列通过x-dead-letter-exchange和x-dead-letter-routing-key参数的指定将生效,死信队列将与普通队列连接。发布消息message:="msg"+strconv.Itoa(int(time.Now().Unix()))fmt.Println(message)//发布消息err=mqCh.Publish(constant.NormalExchange,constant.NormalRoutingKey,false,false,amqp.Publishing{ContentType:"text/plain",Body:[]byte(message),})util.FailOnError(err,"发布消息失败")Producer完整代码包mainimport("fmt""github.com/streadway/amqp""learn_gin/go/rabbitmq/deadletter/constant""learn_gin/go/rabbitmq/deadletter/util""strconv""time")funcmain(){//#==========1.创建连接==========mq:=util.NewRabbitMQ()defermq.Close()mqCh:=mq.Channel//#==========2.设置队列(queue,switch,binding)==========//声明队列varerrerror_,err=mqCh.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{"x-message-ttl":5000,//消息过期时间,毫秒"x-dead-letter-exchange":constant.DeadExchange,//指定死信交换"x-dead-letter-routing-key”:常量。DeadRoutingKey,//指定死信路由键})util.FailOnError(err,"创建普通队列失败")//声明交换err=mqCh.ExchangeDeclare(constant.NormalExchange,amqp.ExchangeDirect,true,false,false,false,nil)util.FailOnError(err,"创建失败normalexchange")//队列绑定(将队列、路由键和交换机绑定在一起)err=mqCh.QueueBind(constant.NormalQueue,constant.NormalRoutingKey,constant.NormalExchange,false,nil)util.FailOnError(err,"normal:Queue,switch,routing-keybindingfailed")//#==========3.设置死信队列(queue,switch,binding)==========//声明死信队列//args为nil切记不要给死信队列设置消息过期时间,否则无效消息进入死信队列后会再次过期。_,err=mqCh.QueueDeclare(constant.DeadQueue,true,false,false,false,nil)util.FailOnError(err,"创建死队列失败")//声明交换err=mqCh.ExchangeDeclare(constant.DeadExchange,amqp.ExchangeDirect,true,false,false,false,nil)util.FailOnError(err,"Failedtocreateadeadqueue")//队列绑定(将队列、路由键和开关绑定在一起)err=mqCh.QueueBind(constant.DeadQueue,constant.DeadRoutingKey,constant.DeadExchange,false,nil)util.FailOnError(err,"dead:Queue,exchange,routing-key绑定失败")//#==========4.发布消息==========message:="msg"+strconv.Itoa(int(time.Now().Unix()))fmt.Println(message)//发布消息err=mqCh.Publish(constant.NormalExchange,constant.NormalRoutingKey,false,false,amqp.Publishing{ContentType:"text/plain",Body:[]byte(message),})util.FailOnError(err,"消息发布失败")}Consumer由于队列和交换机是由生产者创建的,消费者只需要做两件事,一个是建立连接,一个是消费消息。也是因为这个原因,消费者比生产者晚启动,可以保证消费时队列是存在的。packagemainimport("learn_gin/go/rabbitmq/deadletter/constant""learn_gin/go/rabbitmq/deadletter/util""log")funcmain(){//#==========1.创建Connection==========mq:=util.NewRabbitMQ()defermq.Close()mqCh:=mq.Channel//#==========2.消费消息==========msgsCh,err:=mqCh.Consume(constant.NormalQueue,"",false,false,false,false,nil)util.FailOnError(err,"Failedtoconsumenormalqueue")永远:=make(chanbool)gofunc(){ford:=rangemsgsCh{//要实现的逻辑log.Printf("Receivedmessage:%s",d.Body)//手动响应d.Ack(false)//d.Reject(true)}}()log.Printf("[*]Waitingformessage,ToexitpressCTRL+C")<-forever}死信消费者死信队列和开关也交给生产用户创建后,死信消费者只需要做两件事,建立连接和消费消息。packagemainimport("learn_gin/go/rabbitmq/deadletter/constant""learn_gin/go/rabbitmq/deadletter/util""log")funcmain(){//#==========1.创建Connection==========mq:=util.NewRabbitMQ()defermq.Close()mqCh:=mq.Channel//#==========2.消费死信消息==========msgsCh,err:=mqCh.Consume(constant.DeadQueue,"",false,false,false,false,nil)util.FailOnError(err,"消耗死队列失败")forever:=make(chanbool)gofunc(){ford:=rangemsgsCh{//要实现的逻辑log.Printf("Receivedmessage:%s",d.Body)//手动响应d.Ack(false)//d.Reject(true)}}()log.Printf("[*]Waitingformessage,ToexitpressCTRL+C")<-forever}源代码Mr-houzi/go-demoend!个人博客同步文章Golang实现RabbitMQ的死信队列
