【.com原创文章】ApacheKafka是一个流行的分布式数据流平台,已经被NewRelic(数据智能平台)、Uber、Square(移动支付公司)等大公司广泛使用。可扩展、高吞吐量和高度可靠的实时数据流系统。例如,在NewRelic的生产环境中,Kafka集群每秒能够处理超过1500万条消息,其数据聚合率接近1Tbps。可见Kafka大大简化了数据流的处理,因此也赢得了众多应用开发者和数据管理专家的青睐。但是Kafka在大型系统中的应用会比较复杂。如果您的消费者跟不上数据流,消息通常会在查看之前消失。同时,它在自动数据保留、高流量发布-订阅(pub/sub)模型等方面的局限性,都可能会影响您系统的性能。毫不夸张的说,如果存储数据流的系统不能按需扩展,或者稳定性不靠谱,估计经常会吃不饱睡不着觉。为了降低上面的复杂度,这里分享NewRelic针对Kafka集群应对高吞吐量的20条最佳实践。我将从以下四个方面展开:Partitions(分区)Consumer(消费者)Producers(生产者)Brokers(代理)快速理解Kafka的概念和架构Kafka是一个高效的分布式消息系统。在性能方面,它具有内置的数据冗余和弹性,以及高吞吐量和可扩展性。在功能上,它支持自动化的数据存储约束,能够以“流”的方式为应用程序提供数据转换,并按照“键值(key-value)”建模关系对数据流进行“压缩”。要了解各种最佳实践,首先需要熟悉以下关键术语:消息(message)Kafka中的一条记录或数据单元。每条消息都有一个键、一个对应的值,有时还有可选的消息头。生产者(producer)Producer向Kafka的topic发布消息。Producer决定如何发布主题分区,例如:roundrobin的随机方法,或者基于消息键(key)的分区算法。Broker(代理)Kafka作为分布式系统或集群运行。那么集群中的每个节点就称为一个Broker。主题(topic)主题是一类被发布的数据记录或消息。消费者通过订阅主题来读取写给他们的数据。TopicPartition(主题分区)不同的Topic分为不同的分区,每条消息都会分配一个Offset,通常每个分区至少会被复制一到两次。每个分区都有一个Leader和一个或多个副本(即数据的副本)存储在每个Follower上。这种方法可以防止Broker失败。集群中的所有Broker都可以作为Leader和Follower,但是一个Broker最多只能拥有一份TopicPartition。Leader可用于所有读写操作。偏移量(offset)单个分区中的每条消息都被分配了一个偏移量,它是一个单调递增的整数,可以用作分区中消息的唯一标识符。Consumer(消费者)Consumer通过订阅Topic分区来读取Kafka的各种Topic消息。消费应用程序然后处理消息以完成指定的工作。Consumergroup(消费组)Consumer在逻辑上可以按照Consumergroup进行划分。TopicPartition平均分配给组内的所有Consumer。因此,在同一个Consumer组中,所有的Consumer都是以负载均衡的方式运行的。换句话说,同一组中的每个Consumer都可以看到每条消息。如果一个Consumer处于“离线”状态,则该分区将被分配给同一组中的另一个Consumer。这称为“重新平衡”。当然,如果组内的消费者数量多于分区数量,就会有部分消费者闲置。相反,如果组中的消费者数量少于分区数量,则某些消费者会从多个分区中获取消息。Lag(延迟)当消费者的速度跟不上消息生成的速度时,消费者会因为无法从分区中读取消息而被延迟。延迟表示为分区标头后的偏移量。从延迟状态恢复(“赶上”)所需的时间取决于消费者每秒可以处理的消息速率。公式如下:time=messages/(consumerratepersecond-produceratepersecond)Partitions的最佳实践①了解分区的数据速率,以确保提供合适的数据存储空间。这里所谓的“分区数据率”是指产生数据的速率。换句话说,就是用“平均消息大小”乘以“每秒消息数”得到的数据速率来确定在给定时间可以保证的数据存储空间的大小(以字节为单位)。如果不知道数据速率,就无法正确计算给定时间跨度的数据需要存储多少空间。同时,datarate也可以无延迟地识别单个Consumer需要支持的最大性能值。②除非您有其他架构需求,否则写题目时请使用随机分区。当您执行大规模操作时,每个分区的不均匀数据速率非常难以管理。原因来自以下三个方面:首先,“热”(更高吞吐量)分区上的Consumer必然会比同组的其他Consumer处理更多的消息,因此很可能造成网络上的处理和Bottleneck。其次,为数据速率最高的分区配置的最大预留空间会导致主题中其他分区的磁盘使用率相应增加。第三,基于分区的Leader关系实现的最优平衡方案比简单地将Leader关系分散到所有Broker复杂。在同一主题中,“热”分区将“承载”其他分区的10倍权重。TopicPartition的使用请参考《Kafka Topic Partition的各种有效策略》https://blog.newrelic.com/engineering/effective-strategies-kafka-topic-partitioning/。Consumers最佳实践③如果Consumers运行的是比Kafka0.10更旧的版本,请立即升级在0.8.x版本中,Consumer使用ApacheZooKeeper来协调Consumergroups,许多已知的bug会导致它长时间处于重新平衡状态,或者直接导致再平衡算法失败(我们称之为“再平衡风暴”)。因此在重新平衡期间,一个或多个分区被分配给同一组中的每个消费者。在再平衡风暴中,分区的所有权将继续在消费者之间流动,这反过来又会阻止任何消费者实际获得分区所有权。④调优Consumersocketbuffers(套接字缓冲区)以应对数据的高速流入在Kafka0.10.x版本中,参数receive.buffer.bytes的默认值为64KB。在0.8.x版本的Kafka中,参数socket.receive.buffer.bytes的默认值为100KB。这两个默认值对于高吞吐量环境来说都太小了,特别是如果Broker和Consumer之间的网络带宽延迟积(bandwidth-delayproduct)大于局域网(LAN)。对于延迟为1毫秒或更长的高带宽网络(例如10Gbps或更高),请考虑将套接字缓冲区设置为8或16MB。如果您的RAM不足,也请考虑至少1MB。当然你也可以设置为-1,这样会让底层操作系统根据网络的实际情况调整缓冲区的大小。然而,对于需要启动“热”分区的消费者来说,自动缩放可能没有那么快。⑤设计高吞吐量的消费者,实现按需反压。通常,我们应该保证系统只处理其能力范围内的数据,不超负荷“消耗”,导致进程中断和“挂起”。",否则会发生Consume组的溢出。如果在Java虚拟机(JVM)中运行,Consumer应使用固定大小的缓冲区,最好是堆外缓冲区。请参阅Disruptor模式:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf固定大小的缓冲区可防止消费者将太多数据拉入堆栈,以至于JVM将其所有时间都花在执行垃圾上收集,因此无法执行其处理消息的基本工作。⑥在JVM上运行各种Consumer时,请警惕垃圾回收可能对其造成的影响。例如,长期的垃圾回收停滞可能导致ZooKeeper会话被丢弃,或者Consumer组处于重新平衡状态。Broker也是一样,如果垃圾回收停顿时间过长,就有集群下线的风险。Producers最佳实践⑦配置Producer等待各种确认,这样Producer就可以知道消息是否真的发送到了Broker的partition。在Kafka版本0.10.x上,设置为Acks;在版本0.8.x上,它是request.required.acks。Kafka通过复制来提供容错能力,因此单个节点的故障,或者分区领导者关系的变化都不会影响系统的可用性。如果您没有为Producer配置Acks(或“即发即弃”),消息可能会悄无声息地丢失。⑧为每个Producer配置Retries。默认值为3,这当然非常低。但是,正确的设置值取决于您的应用程序,即:对于那些对数据丢失零容忍的应用程序,请考虑设置为Integer.MAX_VALUE(有效和***)。这样就能应对Broker的Leader分区不能立即响应Produce请求的情况。⑨对于高吞吐量的Producer,调优buffersize,尤其是buffer.memory和batch.size(以字节为单位)。由于batch.size是根据partition设置的,所以Producer的性能和内存占用可以和Topic的partition个数相关。因此,这里的设置值将取决于几个因素:Producer数据速率(消息的大小和数量)生产的分区数可用内存量如果Producer由于某种原因(例如,Leader的响应速度慢于确认),那么堆内存(on-heap)中缓冲的数据越多,它需要回收的垃圾就越多。越来越多。⑩检查应用程序以跟踪生成的消息数、平均消息大小和使用的消息数等指标。Brokers最佳实践?在每个Brokers上,请压缩Topics所需的内存和CPU资源。日志压缩(参见https://kafka.apache.org/documentation/#compaction)需要每个Broker上的堆栈(内存)和CPU周期才能成功实现,如果失败的日志压缩数据继续增长,将会带来风险到经纪人分区。您可以调整Broker上的log.cleaner.dedupe.buffer.size和log.cleaner.threads参数,但请记住,这两个值都会影响各个Brokers上的堆栈使用情况。如果Broker抛出OutOfMemoryError异常,它将被关闭并可能导致数据丢失。缓冲区的大小和线程数取决于需要清除的主题分区的数量,以及这些分区中消息的数据速率和密钥大小。从Kafka版本0.10.2.1开始,监视日志清理器的日志文件中的ERROR条目是检测其线程可能出现的问题的最可靠方法。?通过网络吞吐量监控Broker请监控发送(transmit,TX)和接收(receive,RX)的流量,以及磁盘I/O,磁盘空间,CPU使用率,容量规划是维护整体的关键一步集群的性能。?将分区后的Leader关系分布到集群中的各个Broker中。Leader通常需要大量的网络I/O资源。例如,当我们配置复制因子为3并运行它时。Leader首先要获取分区的数据,然后发送两份副本给另外两个Follower,再传送给多个需要数据的Consumer。因此,在这个例子中,单个Leader使用的网络I/O至少是Follower的四倍。而且Leader可能还需要对磁盘进行读操作,而Follower只需要进行写操作。?不要忽视监控Brokers的同步副本(ISR)收缩、复制不足的分区和不受欢迎的领导者。这些是群集中潜在问题的迹象。例如,单个分区频繁的ISR收缩意味着该分区的数据速率超过了领导者的容量,无法再为消费者和其他副本线程提供服务。?根据需要修改ApacheLog4j的各种属性详情请参考:https://github.com/apache/kafka/blob/trunk/config/log4j.propertiesKafka的Broker日志记录会占用大量磁盘空间,但是我们没有办法完全关闭它。因为有时候在发生意外之后,你需要重构事件的来龙去脉,那么Broker日志将是我们最好的,甚至是唯一的途径。?禁用Topics的自动创建,或者为那些不用的Topics建立清除策略。比如设置的x天内没有新消息,就应该考虑Topic是否过期,将其从集群中移除。删除。这使您无需花时间管理在集群中创建的额外元数据。?对于那些持续高吞吐量的Broker,请提供足够的内存,避免它们从磁盘子系统读取。我们应该尽可能直接从操作系统的缓存中获取分区的数据。但是,这意味着你必须保证你的Consumer能跟上“节奏”,而对于那些延迟的Consumers,你只能强制Broker从磁盘读取。?对于具有高吞吐量服务水平目标(servicelevelobjectives,SLOs)的大型集群,请考虑为一个子集的Brokers隔离不同的Topic。至于如何确定需要隔离的Topic,完全看你自己的业务需求。例如,您有一些使用同一集群的多个联机事务处理(OLTP)系统。然后将每个系统的Topic隔离到不同的Brokers子集中可以帮助限制潜在事件的影响半径。?在老客户端上使用新的Topic消息格式。它应该替换客户端并在每个Brokers上加载额外的格式转换服务。当然,最好尽量避免这种情况。?不要错误地认为在本地主机上测试Broker就可以代表生产环境中的真实性能。要知道,如果你使用1的复制因子,在loopback接口上测试partition,那和大多数Production环境有天壤之别。环回接口上的网络延迟几乎可以忽略不计,并且在不涉及复制时接收领导者确认所需的时间也可能有很大差异。结束语希望以上建议可以帮助大家更有效地使用Kafka。如果您想提高您在Kafka方面的专业知识,请进一步查看Kafka配套文档中的“操作”部分,其中包含有关操作集群等的有用信息。【原创稿件,合作网站转载请注明原作者和出处为.com】
