Kafka的消息传输保障机制非常直观。当producer向broker发送消息时,由于复制机制的存在,消息一旦提交,就不会丢失。但是,如果producer向broker发送数据后遇到网络问题导致通信中断,那么producer无法判断消息是否已经提交(commit)。虽然Kafka无法确定网络故障时发生了什么,但是生产者可以多次重试以确保消息已经正确传输到broker,所以Kafka目前实现了至少一次。1.幂等性1.场景中所谓的幂等性是指多次调用接口的结果与一次调用的结果是一致的。生产者在重试时可能会重复写入消息,而Kafka的幂等性功能可以避免这种情况。幂等性是有条件的:它只能保证Producer在单个会话中不会丢失或重载。如果Producer意外挂掉重启,则无法保证(在幂等的情况下,无法获取之前的状态信息,所以是不可能做到跨会话级别的既不损失也不失重)。幂等性不能跨越多个Topic-Partitions,只能保证单个partition内的幂等性。当涉及多个Topic-Partition时,中间状态不同步。Producer使用幂等性的例子非常简单。与正常使用Producer相比,变化不大。只需要将Producer配置的enable.idempotence设置为true即可,如下所示://当enable.idempotence为true时,默认为allprops.put("bootstrap.servers","localhost:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducerproducer=newKafkaProducer(props);producer.send(newProducerRecord(topic,"test");2.事务1.场景幂等性不能跨多个分区操作,而事务可以弥补这个缺点,事务可以保证多个分区写操作的原子性,操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功和部分失败的可能性。为了执行交易,网络故障must提供一个唯一的transactionalId,这个参数通过客户端程序设置。参见代码库:com.heima.kafka.chapter7.ProducerTransactionSendproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transactionId);2.预准备交易需要producer开启幂等特性,所以将transactional.id参数设置为非null以开启交易特性的同时需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为真)。如果将显示设置为false,则会抛出异常。KafkaProducer提供了5个事务相关的方法,具体如下://初始化事务,前提是配置了transactionalIdpublicvoidinitTransactions()。,OffsetAndMetadata>offsets,StringconsumerGroupId)//提交事务publicvoidcommitTransaction()//终止事务,类似回滚publicvoidabortTransaction()3.案例分析见代码库:com.heima.kafka.chapter7.ProducerTransactionSendmessagesender/***KafkaProducer事务的使用*/publicclassProducerTransactionSend{publicstaticfinalStringtopic="topic-transaction";publicstaticfinalStringbrokerList="localhost:9092";publicstaticfinalStringtransactionId="transactionId";publicstaticvoidmain(String[]args){Propertiesproperties=newProperties();properties.erput(KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transactionId);KafkaProducerproducer=newKafkaProducer<>(properties);producer.initTransactions();producer.beginTransaction();try{//处理业务逻辑,创建ProducerRecordProducerRecordrecord1=newProducerRecord<>(topic,"msg1");producer.send(record1);ProducerRecordrecord2=newProducerRecord<>(topic,"msg2");producer.send(record2);ProducerRecordrecord3=newProducerRecord<>(topic,"msg3");producer.send(record3);//处理一些其他逻辑producer.commitTransaction();}catch(ProducerFencedExceptione){producer.abortTransaction();}producer.close();}}模拟事务回滚案例try{//处理业务逻辑并创建ProducerRecordProducerRecordrecord1=newProducerRecord<>(topic,"msg1");producer.send(record1);//模拟事务回滚案例System.out.println(1/0);ProducerRecordrecord2=newProducerRecord<>(topic,"msg2");producer.send(record2);ProducerRecordrecord3=newProducerRecord<>(topic,"msg3");producer.send(record3);//处理一些其他逻辑producer.commitTransaction();}catch(ProducerFencedExceptione){producer.abortTransaction();}从上面的案例来看,msg1发送成功后,发生异常事务,被回滚,导致msg1的消费者收不到消息。3、controller在Kafka集群中会有一个或多个broker,其中一个会被选举为controller(KafkaController),负责管理整个集群中所有partition和replicas的状态。当分区的领导副本发生故障时,控制器负责为分区选举新的领导副本。当检测到一个partition的ISR集发生变化时,controller负责通知所有broker更新自己的元数据信息。当使用kafka-topics.sh脚本增加主题的分区数时,控制器还负责分区的重新分配。Kafka中的控制器选举依赖于Zookeeper。成功竞选为控制器的代理将在Zookeeper中创建一个临时(EPHEMERAL)节点/控制器。本次临时节点内容如下:1、ZooInspector管理使用zookeeper图形化客户端工具(ZooInspector)提供的jar进行管理,启动如下:找到jar所在目录,运行jar文件java-jarzookeeper-dev-ZooInspector.jar连接Zookeeper{"version":1,"brokerid":0,"timestamp":"1529210278988"}其中version在当前版本固定为1,brokerid表示被称为controller的broker的id号,timestamp表示选举被称为controller的时间戳。在任何时候,集群中只有一个控制器。每个broker在启动时都会尝试读取/controller节点的brokerid的值。如果读取的brokerid的值不为-1,则说明其他broker节点已经成功选举为controller,则当前broker将放弃选举;如果/controller节点在Zookeeper中不存在,或者该节点中的数据异常,那么它会尝试创建/controller节点。当当前broker创建一个节点时,其他broker也可能同时尝试创建这个节点,只有创建成功的broker才会成为controller,创建失败的broker代表选举失败。每个broker都会在内存中保存当前controller的brokerid值,可以标识为activeControllerId。在Zookeeper中还有一个/controller_epoch节点与controller相关。该节点是一个持久化(PERSISTENT)节点,节点中存储了一个整型的controller_epoch值。controller_epoch用于记录controller发生变化的次数,即记录当前controller是第几代。我们也可以称之为“控制器时代”。controller_epoch的初始值为1,即集群中第一个controller的epoch为1。当controller发生变化时,如果没有选择新的controller,则该字段的值加1。每个与控制器交互的请求都会携带controller_epoch字段。如果请求的controller_epoch值小于内存中的controller_epoch值,则认为该请求是发送给过期控制器的请求,该请求将被识别为Invalidrequest。如果请求的controller_epoch值大于内存中的controller_epoch值,则意味着新的controller已经被选举出来。可以看出,Kafka使用controller_epoch来保证controller的唯一性,从而保证相关操作的一致性。具有控制人身份的经纪人比其他普通经纪人需要多承担一份责任。具体如下:监控分区相关的变化。监视与主题相关的更改。监控经纪人相关的变化。从Zookeeper中读取以获取与主题、分区和代理相关的所有当前信息并进行相应管理。4.ReliabilityGuarantee可靠性保证:保证系统在各种环境下都能有一致的行为。Kafka的保证保证了分区消息的顺序。如果使用同一个生产者向同一个分区写入消息,并且消息B在消息A之后写入,那么Kafka可以保证消息B的偏移量大于消息A的偏移量,消费者会读取消息A先读再读消息B只有当消息被同步写入分区的所有副本(文件系统缓存)时,才被认为是提交的。生产者可以选择接收不同类型的确认。控制参数acks只要有一个副本是active的,那么committed的消息就不会丢失。消费者只能阅读已提交的确认。提交消息1.invalidreplicas如何判断分区是否有处于同步invalid状态的replica?从Kafka0.9.x开始,唯一的参数是replica.lag.time.max.ms(默认大小为10,000)来控制,当ISR中的follower副本落后于leader副本超过参数指定的值时replica.lag.time.max.ms,确定副本无效,需要从ISR中移除follower副本。具体实现原理很简单。当follower副本同步完leader副本的LEO(LogEndOffset,每个分区最后一条消息的位置)之前的所有日志,就认为follower副本已经追上了leader副本。此时,更新副本的lastCaughtUpTimeMs标识符。Kafka的副本管理器(ReplicaManager)启动时,会启动一个副本过期检测的定时任务,这个定时任务会定时检查副本当前时间与lastCaughtUpTimeMs的差值是否大于参数replica指定的值.lag.time.max.ms。不要误以为follower副本只要拉取leader副本的数据就会更新lastCaughtUpTimeMs。试想一下,当leader副本的消息流入速度大于follower副本的拉取速度时,follower副本会继续拉取leader副本。与leader副本同步,如果follower副本也放在ISR中,那么当leader副本失效,follower副本被选为新的leader副本时,会出现严重的消息丢失。2.Replica复制Kafka中的每个主题分区被复制n次,其中n是主题的复制因子。这允许Kafka在集群服务器发生故障时自动切换到这些副本,以便在发生故障时消息仍然可用。Kafka的复制是分区粒度的,分区的write-aheadlog复制到n台服务器。在n个副本中,一个副本是leader,其他的是follower。顾名思义,producer只能向leader分区写入数据(读取只能从leader分区进行),follower只能从leader按顺序复制日志。副本无法与领导者同步的原因有多种。慢副本:follower在一定时间内追不上leader。最常见的原因之一是I/O瓶颈,它导致跟随者附加复制消息的速度比从领导者那里拉取消息的速度慢。Stuckcopy:follower在一定时间内停止向leader拉取请求。跟随者副本由于GC暂停或跟随者失败或死亡而卡住。新启动的副本:当用户增加主题的副本因子时,新的追随者在完全赶上领导者日志之前不会出现在同步副本列表中。如何判断replica滞后:replica.lag.max.messages=4server端配置replica.lag.time.max.ms只有一个参数。该参数解释了副本响应分区领导者的最长等待时间。探测卡住或失败的副本-如果副本失败,则发送拉取请求之间的时间超过replica.lag.time.max.ms。Kafka会认为这个副本已经死亡,并将其从同步副本列表中移除。检测慢速副本的机制已经改变——如果一个副本开始落后于领导者超过replica.lag.time.max.ms。Kafka会认为它太慢并且会从同步副本列表中删除。除非replica要求leader时间间隔大于replica.lag.time.max.ms,否则即使leader使流量激增,大批量写入消息。Kafka也不会从同步副本列表中删除副本。LeaderEpoch参考了Kafka0.11.0.0的数据丢失场景和数据不一致场景。版本解决方案造成以上两个问题的根本原因在于HW值是用来衡量副本备份是否成功,以及在发生故障时作为日志截断的依据。但是HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理过程来更新,所以中间任何崩溃都可能导致HW值过期。由于这些原因,Kafka0.11引入了leaderepoch来取代HW值。leader额外开辟一块内存区域来存放leader的epoch信息,这样即使出现以上两种情况,也可以很好的避免这些问题。所谓leaderepoch其实就是一对值:(epoch,offset)。Epoch代表leader的版本号,从0开始,leader改变一次,epoch就会+1,offset对应epoch版本的leader写入的第一条消息的位移。所以假设有两对值:(0,0)(1,120)表示第一个leader从偏移量0开始写消息;一共写了120个[0,119];第二个leader的版本号是1。从offset120开始写消息。这样的缓存保存在leaderbroker中,并定期写入checkpoint文件。Avoiddataloss:Avoiddatainconsistency六、消息重复的场景及解决方案1.生产者重复生产发送的消息没有收到正确的断响应,导致生产者重试。producer发出消息,broker入盘后,由于网络等各种原因,sender得到发送失败或网络中断的响应,producer收到可恢复的Exceptionretry消息,导致要重复的消息。解决方案:开启Kafka的幂等性开启Kafka的幂等性,不需要修改代码,默认是关闭的,需要修改配置文件:enable.idempotence=true和requireack=allandretries>1。ack=0,不重试。消息可能会丢失,适用于比数据丢失更重要的吞吐量指标,比如日志采集。消费者端重复消费数据的根本原因是offset没有及时提交给broker。解决办法是取消自动自动提交,每次消费结束或者程序退出时手动提交。这可能无法保证重复。下游做幂等的一般解决方案是让下游做幂等或者尽量记录消费的每条消息的offset。对于少数严格的场景,可能需要将offset或uniqueID,比如订单ID和下游状态更新放在同一个数据库中。做一个事务保证准确更新或者同时在下游数据表中记录消费偏移量,然后在更新下游数据时,将消费站点作为乐观锁拒绝旧站点的数据更新。7.__??consumer_offsets_consumer_offsets是内部话题,对用户是透明的。除了它的数据文件和日志中偶尔出现的两点,用户一般感觉不到这个话题。但是我们知道它保存的是新版Kafkaconsumer的位移信息。1.何时创建一般情况下,集群中第一个消费者消费消息时,会自动创建主题__consumer_offsets。分区的数量可以通过offsets.topic.num.partitions参数设置。默认值为50,如下:2.解析分区参考代码库:com.heima.kafka.chapter7.ConsumerOffsetsAnalysis获取所有分区:Summary做了解释,解释了消息重复和解决方法。