当前位置: 首页 > 后端技术 > Java

面试官:为什么卡夫卡会丢失消息?

时间:2023-04-01 14:33:25 Java

1。您如何知道消息是否丢失?2.哪些链接可能会丢失消息?3、如何保证消息不丢失?*引入MQ消息中间件最直接的目的:系统解耦与流量控制(削峰填谷)系统解耦:上下游系统之间的通信相互依赖,使用MQ消息队列可以隔离带来的变化大约受上下游环境不稳定因素影响。流量控制:在超高并发场景下,引入MQ可以实现流量“削峰填谷”的作用,实现服务的异步处理,不至于把服务搞垮。MQ的引入也带来了另外一个问题:数据的一致性。在分布式系统中,如果两个节点之间存在数据同步,就会带来数据一致性的问题。消息生产者将消息发送给MQ,再发送给消息消费者,保证消息不丢失。所以在使用MQ消息队列的时候,需要考虑这三个问题:如何知道一条消息丢失了?哪个链接可能会丢失消息?如何保证消息不丢失?图1.如何知道有消息丢失?如何感知消息是否丢失?总结如下:他人反馈:操作和PM反馈信息丢失。监控报警:监控指定指标,实时报警,手动调节。Kafka集群异常、Broker宕机、Broker磁盘挂载问题、消费者异常导致的消息积压,都会给用户带来消息丢失的直接感受。案例:在舆情分析中,数据采集同步PM可以自行下发采集调度指令,采集特定数据。PM可以通过ES近实时的查询到对应的数据,如果没有对应的数据,可以重新下发指令。当感知到的消息丢失时,需要一种机制来检查消息是否丢失。检索消息运维工具包括:查看Kafka消费位置:>后台管理系统+基于SpringBoot的用户小程序+MyBatisPlus+Vue&Element,支持RBAC动态权限,多租户,数据权限,工作流,三方登录、支付、短信、商城等功能>>*项目地址:>*视频教程:#查看某个主题的消息数$./kafka-run-class.shkafka.tools.GetOffsetShell--broker-listlocalhost:9092--topictest_topic>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ实现+Vue&Element后台管理系统+用户小程序,支持RBAC动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能>>*项目地址:>*视频教程:#查看consumerGroup列表$./kafka-consumer-groups.sh--list--bootstrap-server192.168.88.108:9092#查看偏移消费$./kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--groupconsole-consumer-1152--describeGROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSET滞后消费者ID主机客户端-IDconsole-consumer-1152test_topic0-4-consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942/127.0.0.1consumer-console-consumer-1152-12。工具:KafkaTools3。其他可视化界面工具2.哪些链接可能会丢失消息?一条消息从生产到消费要经过三个环节:消息生产者、消息中间件、消息消费者,可能会出现消息丢失的问题。1)生产端首先要实现从Kafka生产端发送消息的过程:调用send()方法时,消息不会立即发送出去,而是缓存起来,缓存中的消息会被分割在适当的时候分批成一批数据,通过SenderThreads分批发送给服务器Broker。该环节消息丢失的场景有:即Producer消息没有发送成功。网络波动:生产者与服务端链路不可达,发送超时。现象是:各端状态正常,但是consumer端没有消费消息,就像丢了消息一样。解决方法:重试props.put("retries","10");配置不当:不确认就发送消息;发送消息失败无回调,无日志。producer.send(newProducerRecord<>(topic,messageKey,messageStr),newCallBack(){...});解决办法:设置acks=1或者acks=all。发送消息设置回调。查看重要参数:acksacks=0:无需等待服务器确认。这意味着重试设置无效。响应中与服务器的偏移量始终为-1,生产者不管发送成功与否,直接发送即可。低延迟,容易丢失数据。acks=1:表示写成功后leader响应producer(但不刷盘)。延迟是中等的。一旦leader副本挂了,数据就会丢失。acks=all:等待数据拷贝完成,相当于-1。如果需要保证消息不丢失,就需要使用这个设置。同时需要设置unclean.leader.election.enable为true,保证当ISR列表为空时,选择Othersurvivingcopiesserveasthenewleader。2)服务器首先了解KafkaBroker写数据的过程:Broker收到一批数据,先写入内存PageCache(OSCache)。操作系统会每隔一段时间刷新OSCache中的数据,这个过程就是“异步批量刷新”。这里有一个隐患。如果数据写入PageCache后KafkaBroker宕机了会怎样?机器停机/断电?KafkaBrokerdown:消息不会丢失。因为数据已经写入PageCache,所以只需要等待操作系统刷盘即可。机器崩溃/断电:消息将丢失。因为数据还在内存中,当内存RAM断电时数据会丢失。解决方案:使用带电池备用电源的缓存,以防止系统异常掉电。相比于学习MySQL的“双1”策略,这个策略基本不用,因为“双1”会导致频繁的I/O操作,也是最慢的。对比学习Redis的AOF策略,默认和推荐策略:Everysec(AOF_FSYNC_EVERYSEC)每秒保存一次(默认):。每次执行写命令后,首先将日志写入AOF文件的内存缓冲区,缓冲区的内容每秒写入磁盘。扩展:Kafkalogdiskflushingmechanism#建议使用默认值,即不配置该配置,让操作系统决定何时刷盘以提高性能。#对于broker配置:log.flush.interval.messages=10000#logflushingmessages的间隔时间,即每收到一定数量的消息,就会进行logflushing。log.flush.interval.ms=1000#日志刷新时间间隔,单位ms,即每隔一定时间刷新一次日志。#对于topic配置:flush.messages.flush.ms=1000#flushevery1sundertopicflush.messages=1#topic下的每条消息都放在磁盘上#查看Linux后台线程执行配置$sysctl-a|grep脏虚拟机。dirty_background_bytes=0vm.dirty_background_ratio=10#表示当脏页占总内存的百分比超过这个值时,后台线程开始刷新脏页。vm.dirty_bytes=0vm.dirty_expire_centisecs=3000#表示多长时间脏数据会被刷新到磁盘(30秒)。vm.dirty_ratio=20vm.dirty_writeback_centisecs=500#表示多久唤醒一次刷新脏页的后台线程(5秒)。vm.dirtytime_expire_seconds=43200Broker的可靠性取决于其多副本机制:一般副本数为3(配置参数:replication.factor=3)LeaderPartition副本:提供外部读写机制。FollowerPartition副本:同步Leader数据。副本之间的数据同步也可能存在问题:数据丢失问题和数据不一致问题。解决方案:ISR和Epoch机制ISR(In-SyncReplicas):当Le``ader宕机时,可以从ISR中选出一个Follower作为Leader。Epoch机制:解决Leader副本高水位更新和Follower副本高水位更新时间不匹配的问题。Tips:Kafka0.11.x版本引入了leaderepoch机制,解决了高水位机制的弊??端。对应的配置参数如下:acks=-1oracks=all:所有replicas都必须同步到消息,表示消息发送成功。replication.factor>=3:必须至少有3个副本。min.insync.replicas>1:表示消息至少写入了2个replicas才算发送成功。前提要求acks=-1。例如:如果leader宕机了,那么在ISR中至少要保证有一个follower,这样follower才能在数据不丢失的情况下被选举为leader。公式:replication.factor=min.insync.replicas+1unclean.leader.election.enable=false:防止不在ISR中的Follower被选举为Leader。从Kafka0.11.0.0版本开始默认默认unclean.leader.election.enable=false。3)消费端消息丢失的场景包括:消息堆积:来自多个分区的消息没有被消费,就像丢失消息一样。解决办法:一般情况下,问题出在消费者这边。尽量提高客户端的消费速度,为消费逻辑开启一个新的线程。自动提交:消费者拉取下一批数据,处理时自动提交offset。这时候,消费者倒下了;重启后拉取了新的一批数据,但是上一批数据还没有处理完。解决方法:取消auto.commit=false,改为手动ack。心跳超时触发Rebalance:客户端心跳超时触发Rebalance,被踢出消费者组。如果只有这一个client,则消息不会被消费。同时,避免两次轮询的间隔超过阈值:max.poll.records:减小参数值,建议远小于<单线程每秒消耗的记录数><数量消费线程数>的产品。max.poll.interval.ms:该值应大于/(<每秒单个线程消费记录数>*<消费线程数>)的值。解决方法:客户端版本升级到0.10.2或更高版本。案例:每当遇到数据同步的时候,消息中的文本在同步到ES之前需要经过NLP的NER解析。这个过程的主要流程是:数据同步程序从Kafka中拉取消息。数据同步程序分析消息中文本发送的NER,得到一个特征数组。数据同步程序将消息同步到ES。现象:在线数据同步程序运行一段时间后,消息不再被消费。排查日志:找了一个Rebalance日志,怀疑是客户端消费太慢被踢出了消费组。本地测试:发现运行一段时间后也会出现Rebalance,NLPNER服务访问HTTP500错误。结论:由于NER服务异常导致数据同步程序消费超时。而且当时客户端版本是v0.10.1。Consumer没有独立的线程维护心跳,而是将心跳维护与poll接口耦合在一起,同样导致心跳超时。当时的解决办法是:session.timeout.ms:设置为25s,而且当时没有升级客户端版本,怕引起其他问题。熔断机制:加入Hystrix,如果超过3个服务调用异常,则熔断,保护客户端正常消费数据。3、如何保证消息不丢失?掌握这些技能:熟悉消息从发送到消费各个阶段的监控告警。熟悉Kafka集群方案。“MQ可靠消息传递”是如何保证消息100%不丢失的?至此,总结一下:生产端:设置重试次数:props.put("retries","10");setacks=all设置回调:producer.send(msg,newCallBack(){...});2.Broker:内存:使用带电池备用电源的缓存。Kafka0.11.x及以后版本:支持Epoch机制。replication.factor>=3:必须至少有3个副本。min.insync.replicas>1:表示消息至少写入了2个replicas才算发送成功。前提要求acks=-1。unclean.leader.election.enable=false:防止不在ISR中的追随者被选为领导者。3、消费者客户端版本升级到0.10.2或以上版本。取消自动提交auto.commit=false,改为手动ack。尽量提高客户端的消费速度,消费逻辑开启新的线程进行处理。