前言首先,企业中最常用的其实既不是RocketMQ也不是Kafka,而是RabbitMQ。RocketMQ很强大,但主要是阿里开源的一个消息队列,用来推广自己的云产品。事实上,使用RocketMQ的中小企业并没有想象中那么多。深层次的原因在于,土宝在中小企业普及的时间较早,经过的考验时间较长。很容易产生“回头客”。与RabbitMQ一起成长起来的人才,如今大多已经成为企业的中坚力量。支持RabbitMQ的可能性更高。至于Kafka,主要用于大数据和日志采集。除了一些有特殊需求的公司,对消息收发准确率要求高的公司,仍然将RabbitMQ作为企业级消息队列的首选。自己工作这么多年的感受是RabbitMQ是经久不衰的。除非未来其他消息中间件有不同的体验,否则RabbitMQ仍将拥有更高的市场份额。所以,对于准备进入软件行业的朋友,我建议还是先系统的学习一下RabbitMQ,然后再去学习其他的消息中间件,开阔眼界。它们的原理相似,可以类比理解。两个概念RabbitMQ避免消息丢失的方法主要是利用消息确认机制和手动签到机制,所以有必要弄清楚这两个概念。1、消息确认机制,主要是生产者用来确认消息是否被消费成功。配置如下:spring:rabbitmq:address:192.168.x.x:xxxxvirtual-host:/username:guestpassword:guestconnection-timeout:5000publisher-confirms:true#消息确认成功publisher-returns:true#消息失败confirmedtemplate:mandatory:true#手动签到机制这样当你实现了RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnCallback这两个接口的方法时,就可以有针对性的进行消息确认的日志记录,进而进行进一步的消息发送补偿有目的地实现接近100%的交付。伪代码如下:@Component@Slf4jpublicclassRabbitMQSenderimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{/***发送消息*/publicvoidsendOrder(Orderorder){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(这个);//发送消息rabbitTemplate.convertAndSend(xx,xx,order,xx);}/***接收成功后的回调*/@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Strings){//如果接收成功,这里可以更新日志表的消息收发状态。//....}/***失败后回调*/@OverridepublicvoidreturnedMessage(Messagemessage,inti,Strings,Strings1,Strings2){//如果失败,这里可以更新日志table更新消息发送和接收状态,然后通过任务调度补偿发送。//....}}2。消息登录机制RabbitMQ消息是自动签名的。您可以理解为快递员已签收。那么快递员的状态就会从已发送变成签收。唯一不同的是,快递公司会记录物流轨迹,签收后MQ从队列中删除。在企业级开发中,RabbitMQ我们基本开启了手动登录的方式,可以有效避免消息的丢失。在上一篇文章中,生产者已经开启了手动登录机制,那么作为消费者,也必须设置手动登录。配置如下:spring:rabbitmq:address:192.168.x.x:xxxxvirtual-host:/username:guestpassword:guestconnection-timeout:5000listener:simple:concurrency:5#并发数max-concurrency:10#最大concurrentnumberacknowledge-mode:manual#启用手动登录prefetch:1#限制一次只能消费一个(一个线程),上面配置5,即一次能收到5个消费监听时,你一行代码即可手动登录。伪代码如下:@RabbitListener(xxx)publicvoidonOrderMessage(@PayloadOrderorder,Channelchannel,@Header(AmqpHeaders.DELIVERY_TAG)longtag)throwsException{//....//手动签名通道。basicAck(标签,假);}Messagelost这两个概念弄清楚后,就可以了解消息丢失的问题和解决方法了。一、原因消息丢失的原因有以下三种:1)消息发送后,中途网络故障,服务器没有收到;2)消息发送后,服务端收到,但没有持久化,服务端宕机;3)、消息发送后,服务端收到,但消费者还没有处理业务逻辑,服务就挂了,消息自动签收,表示什么都没做。这三种情况,(1)和(2)是生产者没有开启消息确认机制造成的,(3)是消费者没有开启手动签到机制造成的。2、方案1),当生产者发送消息时,必须try...catch,在catch中捕获异常,并将MQ发送的关键内容记录到日志表中。日志表必须有消息发送状态。如果发送失败,定时任务会周期性扫描重发并更新状态;2)、生产者发布者必须加入确认回调机制,确认消息发送成功并签名。如果进入失败回调方法,会修改数据库消息状态,等待定时任务Resend;3).消费者需要开启手动ACK机制。只有消费成功才会移除消息。若失败或异常未处理,则重新入队。其实这也是我在解释这两个概念的时候已经提到的。也是目前接近100%消息传递的企业级解决方案之一。主要目的是解决消息丢失的问题。消息重复1.消息重复的原因一般可能出现两种情况:1)消息消费成功,交易已提交,收到消息时服务器宕机或网络原因导致登录失败,消息的状态将从unack变为ready。重新发送给其他消费者;2)消息消费失败,由于retry重试机制,重新入队发送消息。2、解决方法网上一般可以找到三种方法:1)消费者的业务接口是幂等的;2)消息日志表保存了MQ发送时的唯一消息ID,消费者可以使用这个唯一ID。避免消息重复的判断;3)消费者的Message对象有getRedelivered()方法,返回Boolean,TRUE表示重复发送。这里只推荐第一个。业务方法是幂等的,这是最直接有效的方法。(2)还需要和数据库进行交互。(3)可能导致第一次消费失败,第二次消费成功被拒绝。砍掉。消息积压1.原因消息积压一般有两种场景:1)消费者服务挂了,无法消费消息;2)消费者的服务节点太少,导致消费能力不足。积压,这种情况极有可能是生产者流量过大造成的。2、方案1),由于消费能力不足,扩大更多的消费节点,提升消费能力;2)、建立专门的队列消费服务,批量取消息并持久化,然后慢慢消费。(1)是最直接的方式,也是消息积压最常用的解决方案。但部分企业考虑到服务器成本压力,会选择(2)绕道而行。首先,存储要通过独立服务消费的消息,比如保存到数据库中,然后慢慢处理这些消息。这里单独说说我在工作中使用RabbitMQ的一些心得,希望能有所帮助。1)在消息丢失、消息重复、消息积压这三个问题中,其实主要解决的是消息丢失,因为大部分公司不会遇到消息积压的场景,稍微有资质的公司核心业务会解决幂等性问题,所以几乎没有消息重复的可能性;2)、最常见的企业级消息丢失解决方案之一就是定时任务补偿,因为不管是SOA还是微服务架构,都要有分布式任务调度。成为MQ最直接的补偿方式。如果MQ一定要做到100%交付,这是最常见的方案。但是我其实不建议中小企业使用这个方案,因为维护成本凭空增加,而且没有必要做一定规模的项目。每个人都低估了RabbitMQ本身的性能。8核16G服务器集群上线3年无压力;3)不要迷信网上和培训机构讲解的producer消息确认机制,即前面两个概念中提到的ConfirmCallback和ReturnCallback。MQ性能非常低。我们团队曾经遇到过一个情况,在流量高峰期,MQ传输消费性能大幅下降。后来发现是消息确认机制导致的。关闭后立即恢复正常。机制,MQ运行的很流畅。同时我们会建立后台管理实现人工补偿,通过识别业务状态来判断消费者是否处理了业务逻辑。毕竟这种情况很少见,性能和运维成本,这方面我们选择性能;4)、我工作这些年没见过用RabbitMQ后自动签到的方法,必须手动签到;5)、手动登录方式网上看到的教程几乎都是处理完业务逻辑再手动登录,但其实这种用法并不科学,在分布式架构中,对于MQ来说是很常见的用于解耦和转发。如果是支付业务,往往在回调通知中通过MQ转发给其他服务。如果其他服务的业务处理不成功,则手动签名,如果不执行,则消息会被排队发送给其他消费者,在流量高峰阶段可能会因为意外的业务处理失败而造成拥塞,甚至三者题目中提到的问题同时出现,会得不偿失。不科学的用法:处理完业务逻辑后手动签收,否则你不会签收,就像顾客进店你必须买东西,否则你不让离开。@RabbitListener(xxx)publicvoidonOrderMessage(@PayloadOrderorder,Channelchannel,@Header(AmqpHeaders.DELIVERY_TAG)longtag)throwsException{//处理业务doBusiness(order);//手动签名channel.basicAck(tag,false);}科学用法:不管业务逻辑是否处理成功,最后都要对消息进行人工签名。MQ的使命不是保证顾客进店必须消费,不消费不准离开,而是顾客可以进来,哪怕是看一下也是它的使命完成。@RabbitListener(xxx)publicvoidonOrderMessage(@PayloadOrderorder,Channelchannel,@Header(AmqpHeaders.DELIVERY_TAG)longtag)throwsException{try{//处理业务doBusiness(order);}catch(Exceptionex){//记录日志,通过后台管理或其他方式手动处理失败的业务。}finally{//手动签名channel.basicAck(tag,false);}}可能有人会问你这跟自动登录有什么区别吗,NO,你要知道,如果你自动登录,消息可能会丢失,甚至你的日志什么都不会记录。另外,为什么一定要这样做,因为MQ是一个中间件,本身就是一个辅助工具,它是一个滴滴司机。保证送到你手上,顺便说声再见。无需下车为您搬运东西。如果对MQ施加过大的压力,只会造成自身业务的畸形。我们使用MQ的目的是解耦和转发,不再做多余的事情,保证MQ本身是顺畅的,职责单一的。小结本文主要讲一下RabbitMQ的三个常见问题及解决方法。同时分享笔者在工作中的一些心得体会。我觉得网上很难找到。如果哪天用到,不妨再打开看看。也许可以避免一些在生产环境中可能出现的问题。我总结了三点:1)消息100%下发会增加运维成本,中小企业根据情况使用,非必要不用;2)、消息确认机制影响性能,非必要不要使用;3)、消费者首先保证消息可以被签收,业务处理失败可以人工补偿。工作中怕的从来不是不会用技术,而是遇到问题不知道怎么解决。分享多年的工作和学习,在云笔记中记录了很多内容。我在业余时间组织了它。这篇文章也是其中之一。有兴趣的朋友可以在评论中获取。使用时,打开它。它可能会节省很多时间。原创文章纯手写。如果觉得有一点帮助,请点个赞~继续分享工作中的真实心得和体会。喜欢就关注吧~更多最新技术文章请关注GZH:【Java分享客栈】
