图片来自PexelsBrokerBroker由于Kafka本身丢失消息。为了获得更高的性能和吞吐量,Kafka将数据分批异步地存储在磁盘上。在消息的flushing过程中,为了提高性能,减少flush的次数,Kafka采用了batchflushing的方式。即按照一定的消息量,按时间间隔刷新磁盘。这种机制也是由Linux操作系统决定的。在Linux操作系统中存储数据,会先存储在页面缓存(Pagecache)中,然后根据时间或其他条件刷入磁盘(从PageCache到文件),或者通过fsync命令。当数据在PageCache中时,如果系统挂了,数据就会丢失。Broker在Linux服务器上高速读写并同步到Replica上图简单描述了Broker写入数据和同步的过程。Broker只将数据写入PageCache,而PageCache驻留在内存中。断电后这部分数据会丢失。PageCache的数据是通过Linuxflusher程序来刷新的。闪烁的触发条件有以下三种:主动调用sync或fsync函数。可用内存低于阈值。脏数据时间达到阈值。Dirty是PageCache的一个标识位。当数据写入页面缓存时,页面缓存被标记为脏。数据刷新后,清除脏标志。Broker通过调用fsync函数接管磁盘刷操作来配置磁盘刷机制。从单个Broker的角度来看,PageCache的数据会丢失。Kafka不提供同步刷新的方法。同步刷机是在RocketMQ中实现的。实现原理是阻塞异步刷机的过程,等待响应,类似于Ajax回调或者JavaFuture。下面是一段RocketMQ的源码:);//Flushing的意思是理论上不可能完全让Kafka保证单个Broker不丢消息。这种情况只能通过调整磁盘复位机制的参数来缓解。比如减少刷机间隔,减小刷机的数据量。时间越短,性能越差,可靠性越好(越可靠越好)??。这是一道多项选择题。为了解决这个问题,Kafka使用Producer和Broker共同处理单个Broker丢失参数的情况。Producer一旦发现Broker消息丢失,可以自动重试。除非重试次数超过阈值(可配置),否则消息将丢失。在这种情况下,生产者客户端需要手动处理这种情况。那么Producer是如何检测数据丢失的呢?就是通过ack机制,类似于http的三次握手。在考虑请求完成之前,生产者要求领导者收到的确认数量。这控制发送的记录的持久性。允许以下设置:acks=0如果设置为零,则生产者根本不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下不能保证服务器已经收到记录,并且重试配置不会生效(因为客户端通常不会知道任何失败)。每条记录返回的偏移量将始终设置为-1.acks=1这意味着领导者会将记录写入其本地日志,但会在不等待所有跟随者的完全确认的情况下做出响应。在这种情况下,如果领导者在确认记录后但在追随者复制它之前立即失败,那么记录将丢失。acks=allT这意味着领导者将等待所有同步副本确认记录。这保证只要至少一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于acks=-1设置。http://kafka.apache.org/20/documentation.html上面参考的是Kafka官方对参数acks的解释(老版本的参数是request.required.acks):①acks=0,Producer不等待Broker的响应,效率最高,但消息容易丢失②acks=1,leaderbroker收到消息后直接返回ack,不等待其他follower的响应。也可以理解为ack的个数为1,此时如果leader在follower收到leader的同步消息之前就挂了,消息就会丢失。根据上图的例子,如果leader收到消息并成功写入PageCache,会返回ack,Producer认为消息发送成功。但是此时根据上图,数据还没有同步到follower。如果此时leader掉电,数据就会丢失。③acks=-1,leaderbroker收到消息后挂掉,等待ISR列表中的所有follower都返回结果后再返回ack。-1相当于全部。这种配置下,只有leader向pagecache写入数据不会返回ack,所有ISR都需要返回“success”才能触发ack。如果此时断电,Producer可以知道消息没有发送成功,会重新发送。如果follower收到数据后成功返回ack,leader断电,数据会存在于原来的follower中。换届后,新的leader将持有这部分数据。从leader到follower的数据同步需要2个步骤:数据从PageCache刷新到磁盘。因为只有磁盘中的数据才能同步到replica。数据同步到replica,replica成功将数据写入PageCache。Producer得到ack后,即使所有机器都断电,数据至少会存在leader的磁盘中。上面第三点提到了ISR列表的follower,还需要一个参数来更好的保证ack的有效性。ISR是Broker维护的一个“可靠的follower列表”,一个in-syncReplicas的列表,Broker的配置包含一个参数:min.insync.replicas。此参数表示ISR中的最小副本数。如果未设置此值,则ISR中的跟随者列表可能为空。这时候相当于acks=1。如上图所示:acks=0,总耗时f(t)=f(1)。acks=1,总耗时f(t)=f(1)+f(2)。acks=-1,总耗时f(t)=f(1)+max(f(A),f(B))+f(2)。性能依次降低,可靠性依次提??高。ProducerProducer丢失消息,发生在producer客户端。Producer为了提高效率,减少IO,可以在发送数据的时候将多个request组合起来发送。合并的请求在第一行发送并缓存在本地缓冲区中。缓存方式和上面提到的刷机类似。Producer可以将请求打包成“块”或者按照时间间隔发送缓冲区中的数据。通过buffer我们可以把producer改造成异步的方式,这样可以提高我们的发送效率。但是,缓冲区中的数据是危险的。一般情况下,客户端的异步调用可以使用回调来处理消息发送失败或者超时。但是一旦Producer被非法停止,buffer中的数据就会丢失,Broker将无法接收到这部分数据。或者,当Producer客户端内存不足时,如果采用的策略是丢弃消息(另一种策略是块阻塞),消息也会丢失。或者,消息生成(异步生成)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。Producer批量发送示意图异步发送消息速度过快示意图根据上图,可以想到几种解决方案:将异步发送消息改为同步发送消息。或者服务产生消息时,使用阻塞线程池,线程数有一定的上限。总体思路是控制消息生成的速度。扩展Buffer的容量配置。这种方法可以缓解这种情况的发生,但不能消除。该服务不直接将消息发送到缓冲区(内存),而是将消息写入本地磁盘(数据库或文件),由另一个(或少数)生产线程发送消息。相当于在buffer和service之间加了一个空间更大的缓冲层。Consumer消费者消费消息有以下几个步骤:接收消息处理消息反馈“提交”(committed)消费者消费主要分为两种:自动偏移提交,AutomaticOffsetCommitting手动偏移提交,ManualOffsetControlConsumer自动提交机制是根据一定的时间间隔,收到的消息被提交。提交过程和消费消息的过程是异步的。也就是说,可能有一个不成功的消费过程(比如抛出异常),已经提交了commit消息。此时消息丢失。Propertiesprops=newProperties();props.put("bootstrap.servers","localhost:9092");props.put("group.id","test");//自动提交开关props.put("enable.auto.commit","true");//自动提交的时间间隔,这里是1sprops.put("auto.commit.interval.ms","1000");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer
