本文将对KafkaConsumer做一个简单的介绍,为深入学习KafkaConsumer打开一个窗口。主要从以下三个方面:CoreParametersCoreComponentsCoreAPI1、KafkaConsumer核心参数概述个人认为如果想了解KafkaConsumer的核心工作机制,可以从它的核心参数入手,从而了解它的队列加载机制为以后深入的消息拉取模型、消费模型、站点提交等机制打下基础。kafkaConsumer的核心属性定义在ConsumerConfig中。1.1基本函数参数group.id消费组名称。client.id客户端标识id,默认为consumer-serialnumber,实际中建议包含客户端IP,不能在一个consumergroup中重复。bootstrap.serversbroker服务器地址列表。客户端查找引导地址的client.dns.lookup方法支持以下两种方法:resolve_canonical_bootstrap_servers_only该方法根据bootstrap.servers提供的主机名(hostname)和主机上的名称服务返回一个IP地址数组(InetAddress.getAllByName),然后依次获取inetAddress.getCanonicalHostName(),然后建立tcp连接。一个主机可以配置多个网卡。如果启用该功能,应该有效利用多网卡的优势,减轻经纪商网络侧的负载压力。use_all_dns_ips方法将直接使用bootstrap.servers中提供的主机名和端口来创建tcp连接,默认选项。enable.auto.commit是否启用站点自动提交,默认为true。auto.commit.interval.ms如果开启自动提交站点,则站点提交频率默认为5s。partition.assignment.strategy消费者队列负载算法,默认为按区间平均分配(RangeAssignor),可选值:轮询(RoundRobinAssignor)消息被删除时采用的策略,默认是latest,可选:earliest,none(会抛出异常)。key.deserializervalue.deserializer消息体序列化类interceptor.classes使用的key序列化类,consumer拦截器可以有多个。check.crcs消费者端是否校验CRC,默认为true。1.2网络相关参数send.buffer.bytes网络通道(TCP)的发送缓冲区大小,默认为128K。receive.buffer.bytes网络通道(TCP)的接收缓冲区大小,默认为32K。reconnect.backoff.ms重新建立连接的等待时间,默认50ms,属于底层网络参数,基本不用关心。reconnect.backoff.max.ms重新建立链接的最大等待时间,默认为1s,如果同一个连接连续重连两次,等待时间将从reconnect.backoff.ms的初始值呈指数增长,但如果超过了max之后,就不再呈指数增长了。retry.backoff.ms重试间隔,默认为100ms。connections.max.idle.ms连接的最大空闲时间,默认为9s。request.timeout.ms请求超时时间,与Broker端网络通信的请求超时时间。1.3核心工作参数max.poll.records每次poll方法调用拉取的最大消息数,默认500条。max.poll.interval.ms两次poll方法调用的最大间隔时间,单位毫秒,默认5分钟。如果消费者在这个时间间隔内没有发起轮询操作,则该消费者将被移除,触发再平衡,并将该消费者分配的队列分配给其他消费者。session.timeout.msconsumer和broker之间的心跳超时时间,默认10s,如果broker在指定时间内没有收到心跳请求,broker会移除consumer并触发rebalancing。heartbeat.interval.ms心跳间隔,consumer会以此频率向broker发送心跳,默认3s,主要是保证session不会失效。fetch.min.bytes拉取消息返回的最小字节数,默认为1字节。fetch.max.bytes一次拉取消息返回的最大字节数,默认1M,如果一个partition的第一批消息大小大于这个值,也会返回。max.partition.fetch.bytes每个分区一次最大取字节数,默认1M。fetch.max.wait.msfetch等待取数据的最大等待时间满足fetch.min.bytes。metadata.max.age.ms元数据在客户端的过期时间。过期后,客户端会重新从broker拉取最新的元数据。默认值为5分钟。internal.leave.group.on.close消费者关闭后是否立即离开订阅组。默认为true,即客户端断开后立即触发再平衡。如果设置为false,rebalance不会立即触发,而是在session过期后触发。2、KafkaConsumer核心组件及API通过KafkaConsumer的核心参数,我们基本可以窥探到Kafka的核心点。下面介绍KafkaConsumer的核心组件,为进一步研究Kafka消费者的消费模型打下基础。2.1核心组件KafkaConsumer由以下核心组件组成:ConsumerNetworkClient消费者网络客户端,服务于底层网络通信,负责客户端与服务端的RPC通信。ConsumerCoordinator消费者协调器,在Kafka的设计中,每个消费者组都会选举集群中的一个broker节点成为该消费者组的协调器,负责消费者组状态的状态管理,特别是消费者组的重平衡(消费者的加入和退出),这个类是消费者和代理协调器之间的交互。Fetcher消息拉取。提醒:本文不打算详细解释每个组件。在这里,我建议大家按照本文第一部分中每个参数的含义,然后比较这些参数最终传递给简历的是哪些组件,并做一个相关的思考。2.2核心API概述最后,我们来看一下消费者的核心API。Set
