当前位置: 首页 > 科技观察

关于Kafka消费者的这些参数,你应该知道什么?

时间:2023-03-14 23:05:26 科技观察

本文将对KafkaConsumer做一个简单的介绍,为深入学习KafkaConsumer打开一个窗口。主要从以下三个方面:CoreParametersCoreComponentsCoreAPI1、KafkaConsumer核心参数概述个人认为如果想了解KafkaConsumer的核心工作机制,可以从它的核心参数入手,从而了解它的队列加载机制为以后深入的消息拉取模型、消费模型、站点提交等机制打下基础。kafkaConsumer的核心属性定义在ConsumerConfig中。1.1基本函数参数group.id消费组名称。client.id客户端标识id,默认为consumer-serialnumber,实际中建议包含客户端IP,不能在一个consumergroup中重复。bootstrap.serversbroker服务器地址列表。客户端查找引导地址的client.dns.lookup方法支持以下两种方法:resolve_canonical_bootstrap_servers_only该方法根据bootstrap.servers提供的主机名(hostname)和主机上的名称服务返回一个IP地址数组(InetAddress.getAllByName),然后依次获取inetAddress.getCanonicalHostName(),然后建立tcp连接。一个主机可以配置多个网卡。如果启用该功能,应该有效利用多网卡的优势,减轻经纪商网络侧的负载压力。use_all_dns_ips方法将直接使用bootstrap.servers中提供的主机名和端口来创建tcp连接,默认选项。enable.auto.commit是否启用站点自动提交,默认为true。auto.commit.interval.ms如果开启自动提交站点,则站点提交频率默认为5s。partition.assignment.strategy消费者队列负载算法,默认为按区间平均分配(RangeAssignor),可选值:轮询(RoundRobinAssignor)消息被删除时采用的策略,默认是latest,可选:earliest,none(会抛出异常)。key.deserializervalue.deserializer消息体序列化类interceptor.classes使用的key序列化类,consumer拦截器可以有多个。check.crcs消费者端是否校验CRC,默认为true。1.2网络相关参数send.buffer.bytes网络通道(TCP)的发送缓冲区大小,默认为128K。receive.buffer.bytes网络通道(TCP)的接收缓冲区大小,默认为32K。reconnect.backoff.ms重新建立连接的等待时间,默认50ms,属于底层网络参数,基本不用关心。reconnect.backoff.max.ms重新建立链接的最大等待时间,默认为1s,如果同一个连接连续重连两次,等待时间将从reconnect.backoff.ms的初始值呈指数增长,但如果超过了max之后,就不再呈指数增长了。retry.backoff.ms重试间隔,默认为100ms。connections.max.idle.ms连接的最大空闲时间,默认为9s。request.timeout.ms请求超时时间,与Broker端网络通信的请求超时时间。1.3核心工作参数max.poll.records每次poll方法调用拉取的最大消息数,默认500条。max.poll.interval.ms两次poll方法调用的最大间隔时间,单位毫秒,默认5分钟。如果消费者在这个时间间隔内没有发起轮询操作,则该消费者将被移除,触发再平衡,并将该消费者分配的队列分配给其他消费者。session.timeout.msconsumer和broker之间的心跳超时时间,默认10s,如果broker在指定时间内没有收到心跳请求,broker会移除consumer并触发rebalancing。heartbeat.interval.ms心跳间隔,consumer会以此频率向broker发送心跳,默认3s,主要是保证session不会失效。fetch.min.bytes拉取消息返回的最小字节数,默认为1字节。fetch.max.bytes一次拉取消息返回的最大字节数,默认1M,如果一个partition的第一批消息大小大于这个值,也会返回。max.partition.fetch.bytes每个分区一次最大取字节数,默认1M。fetch.max.wait.msfetch等待取数据的最大等待时间满足fetch.min.bytes。metadata.max.age.ms元数据在客户端的过期时间。过期后,客户端会重新从broker拉取最新的元数据。默认值为5分钟。internal.leave.group.on.close消费者关闭后是否立即离开订阅组。默认为true,即客户端断开后立即触发再平衡。如果设置为false,rebalance不会立即触发,而是在session过期后触发。2、KafkaConsumer核心组件及API通过KafkaConsumer的核心参数,我们基本可以窥探到Kafka的核心点。下面介绍KafkaConsumer的核心组件,为进一步研究Kafka消费者的消费模型打下基础。2.1核心组件KafkaConsumer由以下核心组件组成:ConsumerNetworkClient消费者网络客户端,服务于底层网络通信,负责客户端与服务端的RPC通信。ConsumerCoordinator消费者协调器,在Kafka的设计中,每个消费者组都会选举集群中的一个broker节点成为该消费者组的协调器,负责消费者组状态的状态管理,特别是消费者组的重平衡(消费者的加入和退出),这个类是消费者和代理协调器之间的交互。Fetcher消息拉取。提醒:本文不打算详细解释每个组件。在这里,我建议大家按照本文第一部分中每个参数的含义,然后比较这些参数最终传递给简历的是哪些组件,并做一个相关的思考。2.2核心API概述最后,我们来看一下消费者的核心API。Setassignment()获取该消费者的队列分配列表。Setsubscription()获取消费者的订阅信息。voidsubscribe(Collectiontopics)订阅主题。voidsubscribe(Collectiontopics,ConsumerRebalanceListenercallback)订阅主题并指定队列再平衡的监听器。voidassign(Collectionpartitions)替换订阅,并手动指定要使用的队列。voidunsubscribe()取消订阅关系。ConsumerRecordpoll(Duration超时)拉取消息是KafkaConsumer的核心方法,下面会详细介绍。voidcommitSync()同步提交消费进度,为本批消费而提交,后续文章会详细介绍。voidcommitSync(Durationtimeout)同步提交消费进度,可以设置超时时间。voidcommitSync(Mapoffsets)显示同步提交的消费进度,offsets表示消费进度需要提交的信息。voidcommitSync(finalMapoffsets,finalDurationtimeout)显示同步提交消费进度,有超时。voidseek(TopicPartitionpartition,longoffset)重置由consumer#poll方法拉取的下一条消息的偏移量。voidseek(TopicPartitionpartition,OffsetAndMetadataoffsetAndMetadata)寻找方法重载方法。voidseekToBeginning(Collectionpartitions)将poll方法的下一次拉取偏移量设置为队列的初始偏移量。voidseekToEnd(Collectionpartitions)将poll方法的下一次拉取偏移量设置为队列的最大偏移量。longposition(TopicPartitionpartition)获取将被拉取的偏移量。longposition(TopicPartitionpartition,finalDurationtimeout)同上。OffsetAndMetadatacommitted(TopicPartitionpartition)获取指定分区的提交偏移量。OffsetAndMetadatacommitted(TopicPartitionpartition,finalDurationtimeout)同上。地图metrics()统计指标。ListpartitionsFor(Stringtopic)获取主题的路由信息??。ListpartitionsFor(Stringtopic,Durationtimeout)同上。MaplistTopics()获取所有主题的路由信息??。MaplistTopics(Durationtimeout)同上。Setpaused()获取挂起的分区信息。voidpause(Collectionpartitions)暂停分区,下一个poll方法将不会从这些分区返回消息。voidresume(Collectionpartitions)恢复挂起的分区。MapoffsetsForTimes(MaptimestampsToSearch)根据时间戳查找最新消息的偏移量。MapoffsetsForTimes(MaptimestampsToSearch,Durationtimeout)同上。MapbeginningOffsets(Collectionpartitions)查询指定分区的当前最小偏移量。MapbeginningOffsets(Collectionpartitions,Durationtimeout)同上。MapendOffsets(Collectionpartitions)查询指定分区的当前最大偏移量。MapendOffsets(Collectionpartitions,Durationtimeout)同上。voidclose()关闭消费者。voidclose(Durationtimeout)关闭消费者。voidwakeup()唤醒消费者。Kafka提供的消费者不像RocketMQ那样提供Push方式自动拉取消息,应用需要自动组织这些API进行消息拉取。值得注意的是,kafka消费者还支持站点自动提交机制,kafka消费者(KafkaConsumer)对象不是线程安全的。基于KafkaConsumer的pause(暂停部分分区的消费)和resume(恢复部分分区的消费),可以轻松实现消费端的限流机制。本文主要是让消费者有个大概的了解。后续文章将继续一一揭开消费者的核心运行机制。请继续关注。本文转载自微信公众号“中间件兴趣圈”,可通过以下二维码关注。转载本文请联系感兴趣的中间件圈公众号。