本文转载自微信公众号《鸣哥的IT随笔》,作者IT鸣哥。转载本文请联系铭哥IT随笔公众号。1前言大家好,我是明哥!KAFKA作为一个开源的分布式事件流平台,在大数据和微服务领域有着广泛的应用场景,是实时流处理场景下消息队列的事实标准。一句话,KAFKA是实时数仓的基石,是事件驱动架构的灵魂。但是,一些技术小伙伴,尤其是很早就开始使用KAFKA的小伙伴,对KAFKA的发展趋势和一些新特性不是很熟悉,在使用过程中踩了很多坑。鉴于此,我们接下来会有KAFKA系列文章专门介绍KAFKA的这些新特性。本文是涵盖KAFAK幂等生产者的系列文章中的一篇。以下是正文。2从历史的角度看KAFKA的发展首先我们从历史的角度看KAFKA的发展:KAFKA在2013年12月推出了一个重要的版本0.8.0,这个版本非常重要,因为它首先引入了KAFKA-50多副本机制为容错打下了坚实的基础;然后在后续版本中逐渐加入很多新特性:比如逐渐摆脱对zookeeper的依赖;比如支持紧凑型清洗策略;比如支持kafka累存储;比如生产者权力平等;例如对交易的支持;比如kafkaconnectapi、kafkastreamapi和kafka大生态的KSQL,以及kafkaschemaregistry;至此(202109),KAFKA最新稳定版已经进化到2.8.0;KAFKA从一开始只是一个高吞吐量的消息中间件,发展成为实时流处理场景下消息队列的事实标准。一句话,KAFKA是实时数仓的基石,是事件驱动架构的灵魂。但是在目前市场上的生产环境中,仍然存在使用0.8.0等较早版本的情况。kafka-timelinekafka-api3什么是幂等生产者?我们知道,当kafka生产者向broker中的topic发送数据时,由于网络抖动等各种原因,producer可能收不到broker的ack确认信息。此时,生产者有两种选择:生产者可以选择忽略ack确认消息,不做任何进一步的处理:此时,消息可能会丢失。(之所以有可能是消息可能没有写入到broker的topic,但是可能已经正确的写入了broker的topic,但是回调ack消息由于producer没有收到)网络抖动;)生产者也可以选择多次尝试重发消息,直到收到ack确认消息或达到最大重试次数:此时可能会造成消息重复写入,即broker端的topic重复存储这些重试发送的消息;producer重发没有收到ack确认的消息,也可能导致broker端topic分区的消息顺序混乱,即因为失败而重发的消息在一些不需要重发而没有失败的消息之后.producer没有收到ack确认就重发消息造成的数据重复问题可以看下面的示意图,其中消息7/8/9/10为重复消息。producer-resend-failureKAFKA的idempotentproducer,即idempotentproducer,解决了以上问题:可以保证消息正确投递到broker端,不会丢失,不会重复,并存储在topic的各个partition中在正确的顺序中间。4如何启用幂等生产者?开启幂等生产者不涉及任何代码层面的改动,只改动以下配置项:enable.idempotence=true;//幂等生产者函数开关message.send.max。retries=xx//发送失败的重试次数可以配置很大,比如10000000,甚至Integer.MAX_VALUE;max.in.flight.requests.per.connection=xx//xx<=5,代表每次连接的in-transitrequest次数,有博文说这个参数必须配置成=1,但是在其实不是,只需要<=5(enable.idempotence为true时max.in.flight必须设置<=5");acks=All//ACK确认参数,可选0/1/-1/ALL,-1相当于ALL,开启幂等生产者功能时,该参数必须配置为ALL/-1,即所有ISR都必须确认收到消息,才认为消息传递成功(当enable.idempotence为真时,acks必须设置为all");在开启幂等生产者即enable.idempotence=true的情况下,可以不配置参数max.in.flight.requests.per.connection和参数Acks,此时会自动配置这两个参数;5幂等生产者的原理是什么?首先需要说明的是,开启幂等生产者后,kafka客户端自动实现的消息失败重发对我们来说是透明的,我们不需要在代码中重试发送.(实际上,在代码中重试消息发送反而会导致消息重复)。它的内部工作原理是这样的:在生产者端,每个生产者被broker自动分配一个ProducerId(PID),producer发送给broker的每条消息内部都附有pid和一个递增的序列号;在broker端,broker为每个topic的每个partition维护一个当前成功写入消息的最大PID-SequenceNumber元组;当broker收到小于当前最大PID-SequenceNumber元组的序号消息时,会丢弃该消息,避免数据重复存储;当broker重新选举新的leader失败时,上述去重机制仍然有效:因为broker的topic中存储的消息体是伴随着PID-sequencenumber信息的,leader的所有消息都会被复制到追随者。当一个原来的follower被选为新的leader时,其内部消息中已经保存了PID-sequencenumber信息,可以进行消息去重。幂等生产者,Broker端去重的工作原理,如下图所示:图6幂等生产者和交易是什么关系?幂等生产者是kafka交易的必要和不充分条件,即:enableidempotentgrowthor,isnotnecessarytostartthetransaction;要启动kafka事务,必须启动幂等生产者;事实上,当kafka事务启动时,kafka会自动启动idempotentproducer。
