之前我们介绍了Kafka的整体架构,Kafka生产者,Kafka生产的消息最终流向哪里?当然,它需要消耗。如果把卡夫卡比作一家餐厅,那么生产者就是厨师的角色,消费者就是客人。如果只有厨师,那油炸的菜就没人吃。这也说不通,这家餐厅只有顾客没有厨师,谁来吃饭?!所以如果你看完上一篇还是不满意,可以继续让你耳目一新。如果你还没有读过上一篇文章,它会让你从现在开始开心。Kafka消费者概念应用程序使用KafkaConsumer从Kafka订阅主题并接收来自这些主题的消息,然后存储它们。应用程序首先需要创建一个KafkaConsumer对象,订阅主题并开始接受消息,验证消息并保存结果。一段时间后,生产者写入主题的速度快于应用程序验证数据的速度。这个时候怎么办?如果只使用单个消费者,应用将跟不上消息生成的速度,就像多个生产者向同一个主题写入消息一样,那么需要多个消费者参与主题中消息的消费。简化消息。Kafka消费者属于消费者群体。一个组中的消费者订阅同一个主题,每个消费者接收来自该主题一部分的消息。下面是Kafka分区消费示意图。上图中的主题T1有四个分区,分别是分区0、分区1、分区2、分区3。我们创建一个消费者组1,这个消费者组中只有一个消费者,它订阅了主题T1,并接收到T1中的所有消息。由于一个consumer处理四个producers发送给partition的消息,压力有点大,需要一个helper来帮忙分担任务,所以演变成下图。这样一来,消费者的消费能力就大大提高了,但是在一些环境下,比如用户产生大量消息的时候,生产者产生的消息还是让消费者吃不消,所以继续增加消费者。如上图所示,每个partition产生的消息都可以被每个consumergroup中的consumer消费。如果在消费组中加入更多的消费者,多余的消费者就会闲置,如下图所示,向组中加入消费者是横向扩展消费能力的主要方式。总而言之,我们可以通过增加消费群体的消费者数量来横向扩张,提升消费能力。这也是为什么建议在创建topic时使用更大数量的partition,这样在消费负载高的时候可以添加consumer来提升性能。此外,消费者的数量不应超过分区的数量,因为额外的消费者是空闲的并且没有帮助。Kafka一个很重要的特性就是只需要写一次消息,并且可以支持任意数量的应用读取这条消息。换句话说,每个应用程序都可以读取全部消息。为了让每个应用程序都能读取全部消息,应用程序需要有不同的消费者组。对于上面的例子,如果我们新增一个消费者组G2,而这个消费者组有两个消费者,那么就会演化成下图。在这种场景下,消费者组G1和消费者组G2都可以收到T1主题中的全量消息,逻辑上属于不同的应用。综上所述,如果应用需要读取全量消息,请为应用设置消费组;如果应用的消费能力不足,那么可以考虑将消费者添加到这个消费组中。消费者组和分区重新平衡什么是消费者组?消费者组是由一个或多个消费者实例(ConsumerInstance)组成的组,是一种具有可扩展性和容错性的机制。一个消费组中的消费者共享一个消费组ID,也称为GroupID。一个群组中的消费者共同订阅并消费一个主题。同一组的消费者只能消费一个分区的消息。过多的消费者将变得闲置无用。上面我们提到了两种消费方式。消费者组消费主题中的消息。这种消费模式也称为点对点消费模式,点对点消费模式也称为消息队列。主题中的消息成倍增加。消费者群体一起消费,这种消费模式也叫发布-订阅模式消费者再平衡我们从上面的消费者进化图可以知道这样一个过程:最初一个消费者订阅一个topic,消费掉所有的Partitioned消息,后来一个消费者加入了组,然后更多的消费者加入组,新加入的消费者实例共享原消费者消息的一部分,通过一个消费者转移分区的所有权,将用户转移给其他消费者的行为称为再平衡,英文名称也称为Rebalance。如下图所示,再平衡非常重要。它为消费者群体带来了高可用性和可扩展性。我们可以安全地添加消费者或移除消费者,但在正常情况下我们不希望出现这种行为。.在重平衡期间,消费者无法读取消息,导致整个消费者组在重平衡期间不可用。此外,当分区被重新分配给另一个消费者时,消息的当前读取状态将丢失,并且它可能还需要刷新缓存,这会在恢复状态之前减慢应用程序的速度。消费者将自己维护为消费者组的成员,并通过向组织协调器(KafkaBroker)发送心跳来确认其拥有的分区。对于不同的消费群体,其组织协调员可以不同。只要消费者定期发送心跳,消费者就被认为是活跃的并且正在处理来自其分区的消息。当消费者检索记录或提交它消费的记录时发送心跳。如果Kafka在一段时间后停止发送心跳,会话(Session)就会过期,组织协调器就会认为消费者死了,就会触发rebalance。如果消费者宕机并停止发送消息,组织协调器会等待几秒钟等它死掉,然后再触发重新平衡。在此期间,死消费者不会处理任何消息。清理消费者时,消费者会通知协调器它要离开组,组织协调器会触发重新平衡以尽量减少处理暂停。再平衡是一把双刃剑。在为消费者群体带来高可用性和可扩展性的同时,它也存在一些明显的缺点(bug),而这些bug直到现在也没有得到社区的修改。再平衡的过程对消费群体有着巨大的影响。因为每一次rebalancing过程都会导致一切停滞不前,参考JVM中的垃圾回收机制,即StopTheWorld,STW,(引自《深入理解 Java 虚拟机》中p76对Serial收集器的描述):更重要的是,它在进行垃圾收集时必须暂停所有其他工作线程。直到收集完毕。StopTheWorld这个名字听上去很酷,但这个工作实际上是由虚拟机在后台自动发起和完成的,在用户不可见的情况下停止所有用户正在处理的线程,这对很多应用程序都是有利的。说是不能接受。也就是说,在rebalancing期间,consumergroup中的consumer实例会停止消费,等待rebalancing完成。而且rebalancing的过程很慢。。。上面创建消费者的理论有点过分了。下面通过代码来解释一下消费者是如何消费的。在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——将需要传递给消费者的属性放在属性对象中。后面我们会重点介绍Kafka的一些配置。这里我们先简单的创建一下,使用3个属性就够了,分别是bootstrap.server,key.deserializer,value.deserializer。我们已经多次使用这三个属性。如果不是很清楚,可以参考带你认识KafkaProducer。另一个属性是group.id。这个属性不是必须的,它指定了这个KafkaConsumer是你属于哪个消费组。也可以创建不属于任何组的消费者Propertiesproperties=newProperties();properties.put("bootstrap.server","192.168.1.9:9092");properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaConsumerconsumer=newKafkaConsumer<>(特性);主题订阅消费者创建完成后,下一步就是开始订阅主题了。subscribe()方法接受一个主题列表作为参数,使用起来比较简单consumer.subscribe(Collections.singletonList("customerTopic"));为了简单起见,我们只订阅一个主题customerTopic,参数传入一个正则表达式,正则表达式可以匹配多个主题。如果有人创建了一个新主题,并且主题名称与正则表达式匹配,则会立即触发重新平衡,消费者可以阅读新主题。要订阅所有与测试相关的主题,您可以这样做consumer.subscribe("test.*");polling我们知道Kafka支持订阅/发布模式,生产者向KafkaBroker发送数据,那么消费者如何知道生产者发送了数据呢?其实生产者产生的数据消费者是不知道的。KafkaConsumer使用轮询定期从KafkaBroker检索数据。有数据就用来消费。如果没有,它将继续轮询并等待。下面是轮询等待的具体实现=1;if(map.containsKey(record.value())){updateCount=(int)map.get(record.value()+1);}map.put(record.value(),updateCount);}}}finally{consumer.close();}这是一个无限循环。消费者实际上是一个长期运行的应用程序,它轮询Kafka以获取数据。第三行代码很重要。Kafka必须周期性的请求数据,否则会认为这个Consumer挂了,会触发rebalancing,将自己的partition交给group中的其他consumer。传递给poll()方法的时间是超市时间,由java.time.Duration类表示。如果这个参数设置为0,poll()方法会立即返回,否则会在指定的毫秒数内等待broker返回数据。poll()方法返回记录列表。每条记录包含记录所属主题的信息、记录所在分区的信息、记录在分区中的偏移量以及记录的键值对。我们通常遍历这个列表,一条一条地处理每条记录。在退出应用程序之前使用close()方法关闭消费者。网络连接和socket也关闭了,马上触发rebalance,而不是等groupcoordinator发现自己不再发送心跳就认定它死了。线程安全在同一个组中,我们不能允许一个线程运行多个消费者,也不能允许多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程。如果一个消费者组中有多个消费者要运行,则每个消费者都必须在自己的线程中运行。Java中的ExecutorService可以用来启动多个Consumers做处理。Consumer配置至此,我们已经学习了如何使用consumerAPI,但是只介绍了几个最基本的属性,Kafka文档列出了所有consumer相关的配置说明。大部分参数都有合理的默认值,一般不需要修改,下面就来介绍一下这些参数。fetch.min.bytes此属性指定消费者从服务器获取记录的最小字节数。当broker收到consumer的数据请求时,如果可用数据量小于fetch.min.bytes指定的大小,它会等到有足够的可用数据再返回给consumer。这减少了消费者和代理的工作量,因为当主题不经常使用时,他们不需要来回处理消息。如果可用的数据不是很多,但是消费者的CPU使用率很高,那么这个属性需要设置得比默认值高。如果消费者数量较多,增加该属性的值可以减少broker的工作量。fetch.max.wait.ms我们通过上面的fetch.min.bytes告诉Kafka,只有数据足够了才会返回给消费者。而fetch.max.wait.ms用于指定broker的等待时间,默认为500毫秒。如果没有足够的数据流入Kafka,就达不到消费者请求的最小数据量,导致500ms的延迟。如果要减少潜在的延迟,可以将参数值设置得更小。如果fetch.max.wait.ms设置为延迟100毫秒,fetch.min.bytes的值设置为1MB,那么Kafka要么在收到消费者请求后返回1MB数据,要么在100毫秒后返回所有可用数据。这取决于先满足哪个条件。max.partition.fetch.bytes此属性指定服务器从每个分区返回给消费者的最大字节数。它的默认值为1MB,即KafkaConsumer.poll()方法从每个分区返回的记录不超过max.partition.fetch.bytes指定的字节数。如果一个主题有20个分区和5个消费者,那么每个消费者至少需要4MB的空闲内存来接收记录。在给消费者分配内存的时候,可以多给他们分配一些,因为如果组里有一个消费者崩溃了,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes的值必须大于broker可以接收的最大消息大小(通过max.message.size属性配置),否则消费者可能无法读取这些消息,导致消费者一直挂断试试。设置此属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要经常调用poll()方法来避免会话过期和分区重新平衡。如果单次调用poll()返回的数据过多,消费者需要更多时间处理,可能无法及时进行下一轮。查询以避免会话过期。如果发生这种情况,您可以减小max.partition.fetch.bytes值,或者延长会话过期时间。session.timeout.ms属性指定消费者在被认为已死之前可以与服务器断开连接的时间。默认为3秒。如果消费者在session.timeout.ms指定的时间内没有向组协调器发送心跳,则认为它已经死亡,协调器将触发重新平衡。将其分区分配给消费组中的其他消费者,该属性与heartbeat.interval.ms密切相关。heartbeat.interval.ms指定poll()方法向组协调器发送心跳的频率,而session.timeout.ms指定消费者在多长时间内不能发送心跳。所以这两个属性一般需要同时修改,heartbeat.interval.ms一定要小于session.timeout.ms,一般是session.timeout.ms的三分之一。如果session.timeout.ms是3s,那么heartbeat.interval.ms应该是1s。将session.timeout.ms值设置为小于默认值可以更快地检测到崩溃的节点并从中恢复,但是长轮询或垃圾收集可能会导致意外的重新平衡。将此属性设置为较大的值可以减少意外的重新平衡,但检测节点崩溃的时间会更长。auto.offset.reset此属性指定消费者在读取没有偏移量或偏移量无效的分区时应执行的操作。它的默认值为latest,也就是说如果offset无效,消费者将从最新的记录开始读取数据。另一个值为earliest,表示如果offset无效,consumer会从头开始读取partition的记录。enable.auto.commit稍后我们将介绍几种不同的提交偏移量的方法。该属性指定消费者是否自动提交偏移量。默认值是true。为了避免数据重复和数据丢失,可以设置为false来控制什么时候提交offset。如果设置为true,还可以通过auto.commit.interval.ms属性partition.assignment.strategy来控制提交的频率,我们知道分区会分配给组内的消费者。PartitionAssignor将根据给定的消费者和主题来确定应该将哪些分区分配给哪个消费者。Kafka默认有Range和RoundRobin两种分配策略client.id这个属性可以是任意字符串,broker用它来标识slaveclient发送的消息,通常用在logs、metrics和quotas中。max.poll.records这个属性用来控制单次调用call()方法可以返回的记录数,可以帮助你控制轮询需要处理的数据量。还可以设置receive.buffer.bytes和send.buffer.bytessocket读写数据时使用的TCPbuffer的大小。如果将它们设置为-1,则使用操作系统默认值。如果生产者或消费者与broker在不同的数据中心,这些值可以适当增加,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。commit和offset的概念特殊的offset我们在上文中提到,消费者每次调用poll()方法进行定时轮询时,都会将生产者写入到Kafka中但尚未被消费者消费的记录返回。因此我们可以跟踪组中的哪个消费者读取了哪些记录。消费者可以使用Kafka来跟踪消息在分区中的位置(偏移量)。消费者会将消息发送到一个名为_consumer_offset的特殊主题,该主题会在每条发送的消息中保存分区偏移量。这个topic的主要作用是记录消费者触发rebalance后的offset。消费者每次向这个topic发送消息,正常情况下不会触发rebalance。这个话题不行。当rebalance被触发,消费或停止工作时,每个消费者可能会被分配到相应的partition,这个topic被设置为让消费者继续处理消息。如果提交的偏移量小于客户端上次处理的偏移量,则两个偏移量之间的消息将被重复处理如果提交的偏移量大于上次消费的偏移量,则两个偏移量之间的消息将丢失既然_consumer_offset如此重要,那么如何它犯了?接下来说一下提交方式。KafkaConsumerAPI提供了多种自动提交偏移量的方式。最简单的方式就是让消费者自动提交offsets。如果enable.auto.commit设置为true,那么每隔5s,消费者会自动提交poll()方法轮询的最大offset。提交间隔由auto.commit.interval.ms控制,默认为5s。与消费者中的其他所有内容一样,自动提交发生在轮询中。在每次轮询中,消费者检查偏移量是否已提交,如果已提交,则提交从上一次轮询返回的偏移量。提交当前偏移量将auto.commit.offset设置为false让应用程序决定何时提交偏移量。使用commitSync()提交偏移量。该接口会提交poll()方法返回的最新偏移量,提交成功后立即返回,提交失败则抛出异常。commitSync()将提交poll()返回的最新偏移量。如果所有记录都处理完了,一定要调用commitSync(),否则还是会有丢消息的风险。如果发生在balance,则从最新的batch开始,所有在balance消息和balance之间发生的消息都会被重新处理。异步提交异步提交commitAsync()和同步提交commitSync()最大的区别就是异步提交不会重试,同步提交会一致重试。同步和异步提交的结合一般情况下,对于偶尔的提交失败,不重试问题不大,因为如果提交失败是暂时性的问题,后续的提交总会成功的。但如果它是关闭消费者或重新平衡之前的最后一次提交,请确保提交成功。所以一般会结合使用commitAsync和commitSync,在consumer关闭前commitoffset。提交特定的偏移量consumerAPI允许在调用commitSync()和commitAsync()方法时传入要提交的分区和偏移量映射,即提交特定的偏移量。