大家好,欢迎来到Tlog4J课堂,我是Jensen。面试官:在MQ的整个消息生产和消费过程中,如何保证100%的消息被消费?考生:MQ有ACK机制保证消息100%被消费。面试官:嗯,你回去等通知吧。。。这个面试题在考MQ组件的时候算是老生常谈了。不知道你是怎么回答的?我们通常使用MQ,但使用技术框架只是第一步。了解其底层原理,深挖技术真相,是每一位IT从业者的基础。这里有一点,如果你想回答好面试官的问题,你最好有金字塔思维——金字塔思维是一种从不同维度思考问题的方式,不重复不遗漏,集体精疲力竭。MQ作为异步通信的消息中间件,除了生产者和消费者的解耦,还可以用于大流量的削峰填谷,解决业务的最终一致性问题,所以消息的“可靠性”是特别重要。重要的是,比如货物出库后的库存数据通过MQ同步到财务系统。如果信息的可靠性得不到保证,财务系统的存货成本分析数据就无法有效支撑财务团队。准确的说,我们要保证MQ消息的可靠性,需要从三个层面/维度去解决:生产者100%投递,MQ持久化,消费者100%消费。这里的100%消费是指消费了很多条消息,并且不要消耗太多。由于MQ是基础网络通信的中间件,网络通信难免会因为丢包、网络抖动等原因造成数据丢失,而MQ组件本身也会因为宕机或软件崩溃而停止服务,导致数据丢失。那么我们就需要从这两个根本原因入手进行补偿。这里科普一下RabbitMQ和Kafka是如何解决的。RabbitMQ这里不得不先提一下RabbitMQ的消息协议——AMQP(AdvancedMessageQueuingProtocol,高级消息队列协议)。在面试的时候,我经常会问应聘者一个问题:RabbitMQ使用的是什么消息协议?大多数考生都无法回答AMQP,更不用说AMQP模型是如何设计的了。在服务器端,三个主要的功能模块连接成一条处理链,完成预期的功能:Exchange:接收发布应用发送的消息,并将这些消息按照一定的规则路由到消息队列中。队列:存储消息,直到它们被消费者安全地处理。Binding:定义exchange和queue的关联,提供路由规则。使用此模型,我们可以轻松地模拟典型的消息传递中间件概念,例如存储转发队列和主题订阅。下面我们就来看看RabbitMQ的消息确认机制是如何保证消息可靠性的。1、生产者通过API设置通道为确认模式,每条消息都会分配一个唯一的ID。如果消息投递成功,即消息到达了broker,channel会向producer发送ack,回调ConfirmCallback接口,并带上唯一的ID。如果发生错误导致消息丢失,例如RoutingKey无法路由到Queue,则会向producer发送nack,并回调ReturnCallback接口并返回唯一ID和异常信息。ack和nack只有一个被触发,只有一次,而且是异步执行的,也就是说producer不需要等待,可以继续发送新的消息。2、consumer端声明queue时,指定noack=false,即consumer不会自动提交ack,broker会等待consumer手动返回ack后再删除消息,否则立即删除。broker的ack没有超时机制,只判断链接是否断开。如果断线(比如consumer在处理消息时crash),消息会被重发,所以consumer必须幂等地处理消息。3、MQ本身一般而言,消息都是存储在内存中进行通信的,基于内存的会存在数据丢失的问题。一旦服务重启,数据将被销毁。RabbitMQ中的数据持久化分为三个方面:交换机持久化、队列持久化和消息持久化。Exchange持久化:exchange_declare在创建exchange时指定参数durable=true,如:channel.exchangeDeclare(exchangeName,“direct/topic/header/fanout”,true);第三个参数是设置持久值。队列持久化:queue_declare在创建队列时指定参数durable=true,如:channel.queueDeclare("queue.persistent.name",true,false,false,null),第二个参数是设置持久值。消息持久化:newAMPQMessage在创建消息时通过参数指定,如:channel.basicPublish("exchange.persistent","persistent",MessageProperties.PERSISTENT_TEXT_PLAIN,"persistent_test_message".getBytes()),或者设置参数deliveryMode=2指定:AMQP.BasicProperties.Builderbuilder=newAMQP.BasicProperties.Builder();builder.deliveryMode(2)。以上只是API层的实现,那么RabbitMQ底层是如何持久化消息的呢?如果指定了持久化参数,它们会以append的形式写入文件,会根据文件大小(默认16M)自动裁剪,生成一个新文件,RabbitMQ启动时会创建两个进程,一个负责持久化消息的存储,一个负责非持久化消息的存储(内存不够用的时候使用)。存储消息时,消息在文件中的映射以及相关信息(包括ID、偏移量、有效数据、左文件、右文件)都会被记录在一个名为ets的表中。当消息被读取时,会根据读取的信息存储到文件中,同时更新信息。当消息被删除时,它只是从ets中删除,成为垃圾数据。当垃圾数据超过比例(默认50%),文件数达到3时,会触发垃圾回收:锁定左右文件,整理左边文件的有效数据,和左边文件的有效数据向左写入文件,更新文件信息,向右删除,合并完成;当一个文件的有效数据等于0时,该文件被删除。在写入文件之前先写入缓冲区buffer。如果缓冲区已满,则写入文件。注意,此时还只是操作系统的页面存储,还没有入盘。每25ms刷新一次磁盘(如linux中的fsync命令),不管buffer(fd的读写缓冲区)是否已满,buffer和pagememory中的数据都会落到磁盘中。还有一种写机制:每条消息写完后,如果没有后续的写请求,直接刷盘。另外,除了消息确认机制,RabbitMQ还有另一种方法——使用事务消息:消息生产者发送commit命令,MQ同步返回commitok命令。该方法只能执行,因为需要同步阻塞等待MQ返回是否发送成功,其他操作性能较差,不推荐使用。KafkaKafka在MQ领域以其高性能、强吞吐、强消息堆积能力着称。常用于日志采集、消息系统、用户活动跟踪、运营指标、流处理等场景。说之前,先简单说一下。说说Kafka的架构设计:ConsumerGroup:消费者组,消费者组中的每个消费者负责消费不同分区的数据,提高消费能力。这是一个逻辑订阅者。Topic:可以理解为队列。主题对消息进行分类。生产者和消费者面对的是同一个Topic。Partition:为了实现可扩展性和提高并发性,将一个Topic以多个Partition的形式分发给多个Broker。每个Partition都是一个有序队列,一个Topic的每个Partition都有若干个副本(Replicas),一个Leader和若干个Follower;生产者发送数据的对象,消费者消费数据的对象都是通过Leader,Follower负责实时从Leader同步数据,并保持数据与Leader同步;当Leader失效时,AFollower也会成为新的Leader。1、生产者端的Kafka消息发送者有ACK机制。设置ack参数:ack=0,表示不重试,Kafka不需要返回ack,极有可能因为各种原因丢失;ack=1,表示Leader写入成功后会返回ack,Follower不一定同步成功;ack=all或ack=-1,表示ISR列表中的所有Follower都同步完毕,然后返回ack。设置参数unclean.leader.election.enable:false,禁止选举ISR以外的Follower为Leader,只从ISR列表中的节点选举Leader;可能会牺牲Kafka的可用性,但是可以提高消息的可靠性。重试机制,设置tries>1,表示消息重发次数。设置最小同步副本数min.insync.replicas>1。在满足这个值之前,Kafka不提供读写服务,写操作会异常。通过设置最小同步副本数和ACK机制,MQ可以在性能和可靠性之间取得平衡。2、消费者端手动提交offset:Kafka消费者拉取消息后,默认会自动提交offset。由于consumer每次都是根据offset来消费消息,如果consumer处理业务失败,实际上面我们要重新消费,所以我们需要在消息处理成功后手动提交offset来确认消息可以被消费成功。3、MQ本身很简单,通过减少broker刷刷的间隔来实现高可靠性。深究其原理,还得看一下Kafka的持久化机制。磁盘的顺序读写:与RabbitMQ不同,Kafka是基于磁盘读写的,那么为什么Kafka的吞吐量还是那么大呢?原因是kafka的读写都是顺序读写,不需要寻址随机读写。而且因为是用磁盘写数据,所以消息的堆积能力肯定比内存型的RabbitMQ要强。操作系统的零拷贝技术用于防止CPU将数据从一个存储器拷贝到另一个存储器。这里不详细介绍零拷贝。与Java应用不同,Kafka消息不需要在用户缓冲区处理磁盘数据再返回,因此可以使用零拷贝技术。Partitionandsegmentation+index:Kafka的消息实际上是分片存储的。每次读写一个文件,也是直接操作段。为了进一步优化查询,Kafka还默认为切分的数据文件创建索引。文件(即文件系统上的.index文件),这种分区和段+索引的设计,不仅提高了数据读取的效率,还提高了数据操作的并行性(类似于ConcurrentHashMap的段锁机制)。批量压缩&批量读写:多条消息压缩在一起传输(如gzip格式)和读写,节省带宽。直接操作pagecache:虽然Kafka是用Java写的,运行在JVM上,但是Kafka的消息读写是直接在操作系统page上操作的,而不是在JVM的堆内存中,避免了耗时和JVM的object-consumingGC创建需要时间,读写速度更高,并且JVM进程重启时缓存不会丢失。在了解了Kafka的持久化机制是一种直接读写页存储+定时刷盘的方式后,我们只需要设置刷盘策略即可兼顾性能和可靠性。Kafka提供了3个参数来优化磁盘刷新机制:log.flush.interval.messages//有多少条消息被刷新一次。log.flush.interval.ms//多久刷新一次磁盘。log.flush.scheduler.interval.ms//周期性刷新。总结框架的面试题,最重要的是掌握技术框架的底层实现原理和适用场景。基本上回答这两个方面就可以了。如果你不能回答其他奇怪和详细的问题,我们会指导你。面试官表达自己对框架的理解就够了,毕竟细节太多了。那么如何才能掌握呢?至少通过框架的特性,可以根据需要实现一个简单的版本,比如实现一个Spring框架,实现一个MQ组件等等。
