当前位置: 首页 > 科技观察

Apache Kafka在大型应用中的20项优秀实践

时间:2023-03-13 16:54:18 科技观察

ApacheKafka在大型应用(移动支付公司)等大公司的20BestPractices用于构建可扩展、高吞吐、高可靠的实时数据流系统。例如,在NewRelic的生产环境中,Kafka集群每秒能够处理超过1500万条消息,其数据聚合率接近1Tbps。可见Kafka大大简化了数据流的处理,因此也赢得了众多应用开发者和数据管理专家的青睐。但是Kafka在大型系统中的应用会比较复杂。如果您的消费者跟不上数据流,消息通常会在被查看之前消失。同时,它在自动数据保留、高流量发布-订阅(pub/sub)模型等方面的局限性,都可能会影响您系统的性能。毫不夸张的说,如果存储数据流的系统不能按需扩展,或者稳定性不靠谱,估计经常会吃不饱睡不着觉。为了降低上面的复杂度,这里分享NewRelic针对Kafka集群应对高吞吐量的20条最佳实践。我将从以下四个方面展开:Partitions(分区)Consumer(消费者)Producers(生产者)Brokers(代理)快速理解Kafka的概念和架构Kafka是一个高效的分布式消息系统。在性能方面,它具有内置的数据冗余和弹性,以及高吞吐量和可扩展性。在功能上,它支持自动数据存储限制,能够以“流”的方式为应用程序提供数据转换,按照“键值(key-value)”建模关系对数据流进行“压缩”。要了解各种最佳实践,您需要熟悉以下关键术语:消息:Kafka中的一条记录或数据单元。每条消息都有一个键、一个对应的值,有时还有可选的消息头。生产者(producer):生产者向Kafka的topic发布消息。生产者决定如何发布到主题分区,如:轮询的随机方式,或者基于消息键(key)的分区算法。Broker(代理):Kafka作为分布式系统或集群运行。那么集群中的每个节点都称为代理。主题:主题是发布的数据记录或消息的类别。消费者通过订阅主题来阅读写给他们的数据。主题分区:不同的主题被划分到不同的分区中,每条消息都会分配一个偏移量。通常,每个分区至少会被复制一次或两次。每个分区都有一个领导者和一个或多个副本(即数据的副本)存储在每个跟随者上。这种方法可以防止某个代理失败。集群中的所有broker都可以充当leader和follower,但是一个broker最多只能有一个主题分区的副本。Leader可用于所有读写操作。偏移量(offset):单个分区中的每条消息都被分配了一个偏移量,它是一个单调递增的整数,可以作为分区中消息的唯一标识符。Consumer(消费者):消费者通过订阅topic分区来读取Kafka的各种topic消息。消费应用程序然后处理消息以完成指定的工作。Consumergroup(消费组):消费者可以按照消费组进行逻辑划分。主题分区均匀分布在组中的所有消费者中。因此,在同一个消费者组中,所有的消费者都是以负载均衡的方式运行的。换句话说,同一组中的每个消费者都可以看到每条消息。如果消费者处于“离线”状态,则该分区将分配给同一组中的另一个消费者。这称为“重新平衡”。当然,如果组内的消费者数量多于分区数量,就会有部分消费者闲置。相反,如果组中的消费者数量少于分区数量,则某些消费者会从多个分区中获取消息。Lag(延迟):当消费者的速度跟不上消息产生的速度时,消费者会因为无法从分区中读取消息而被延迟。延迟表示为分区标头之后的偏移量。从延迟状态恢复(“赶上”)所需的时间取决于消费者每秒可以处理的消息速率。公式如下:time=messages/(consumerratepersecond-produceratepersecond)Partitions的最佳实践了解分区的数据速率,以保证合适的数据存储空间。这里所谓的“分区的数据速率”是指数据的产生速率。换句话说,它是“平均消息大小”乘以“每秒消息数”。数据速率决定了给定时间保证数据存储的大小(以字节为单位)。如果不知道数据速率,就无法正确计算给定时间跨度的数据需要存储多少空间。同时,数据速率还可以毫不延迟地确定单个消费者需要支持的最大性能值。除非您有其他架构需求,否则请在编写主题时使用随机分区。当您有大型操作时,跨分区的数据速率差异可能很难管理。原因来自以下三个方面:首先,“热”(更高吞吐量)分区上的消费者必然会比同组中的其他消费者处理更多的消息,因此很可能造成网络上的处理和瓶颈。其次,为数据速率最高的分区配置的最大预留空间会导致主题中其他分区的磁盘使用率相应增加。第三,根据分区的leader关系实施的最优平衡方案比简单地将leader关系分散到所有broker要复杂。在同一主题中,“热”分区将“承载”其他分区的10倍权重。关于topic分区的使用,可以参考《Kafka Topic Partition的各种有效策略》(https://blog.newrelic.com/engineering/effective-strategies-kafka-topic-partitioning/)了解更多。消费者最佳实践如果消费者运行的版本低于Kafka0.10,请立即升级。在0.8.x版本中,consumer使用ApacheZooKeeper来协调consumergroup,很多已知的bug会导致它长期处于rebalancing状态,或者直接导致rebalancing算法失败(我们称之为“rebalancing”风暴”)。因此在重新平衡期间,一个或多个分区被分配给同一组中的每个消费者。在再平衡风暴中,分区的所有权将继续在消费者之间流动,这反过来又会阻止任何消费者实际获得分区的所有权。调整消费者的套接字缓冲区(socketbuffers)以应对高速涌入的数据。在Kafka版本0.10.x中,参数receive.buffer.bytes的默认值为64kB。在0.8.x版本的Kafka中,参数socket.receive.buffer.bytes的默认值为100kB。这两个默认值对于高吞吐量环境来说都太小了,尤其是在代理和消费者之间的网络带宽延迟乘积大于局域网(LAN)的情况下。对于延迟为1毫秒或更长的高带宽网络(例如10Gbps或更高),请考虑将套接字缓冲区设置为8或16MB。如果您的RAM不足,也请考虑至少1MB。当然你也可以设置为-1,这样底层操作系统可以根据网络的实际情况调整缓冲区的大小。然而,对于需要启动“热”分区的消费者来说,autoscale可能没有那么快。设计具有高吞吐量的消费者以实现按需背压。总的来说,我们应该保证系统只处理其能力范围内的数据,不会超负荷“消费”,导致进程中断“挂起”,或者溢出消费组。如果在Java虚拟机(JVM)中运行,消费者应使用固定大小的缓冲区(请参阅Disruptor模式:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf),并且***是使用堆外内存(off-heap)。固定大小的缓冲区可以防止消费者将过多的数据拉入堆栈,以致JVM将所有时间都花在执行垃圾收集上,从而无法执行其处理消息的基本工作。在JVM上运行各种消费者时,请注意垃圾回收可能对它们造成的影响。例如,长期的垃圾回收停滞可能导致ZooKeeper会话被丢弃,或者消费者组处于重新平衡状态。broker也是一样,如果垃圾回收停顿时间过长,就有集群下线的风险。生产者的最佳实践配置生产者等待各种确认。这样生产者就可以知道消息是否真正发送到了代理的分区。在Kafka版本0.10.x上,设置为acks;在0.8.x版本上,它是request.required.acks。Kafka通过复制来提供容错能力,因此单个节点的故障,或者分区领导者关系的变化都不会影响系统的可用性。如果您不使用acks配置生产者(或“即发即忘”),消息可能会悄无声息地丢失。为每个生产者配置重试。它的默认值是3,当然这个值很低。但是,正确的设置值取决于您的应用程序,即:对于那些对数据丢失零容忍的应用程序,请考虑设置为Integer.MAX_VALUE(有效且绝对)。这样就能应对broker的leader分区不能立即响应produce请求的情况。对于高吞吐量生产者,调整缓冲区大小,尤其是buffer.memory和batch.size(以字节为单位)。由于batch.size是根据分区设置的,因此生产者的性能和内存使用可以与主题中的分区数相关。因此,此处的设置值将取决于几个因素:生产者数据速率(消息的大小和数量)、要生产的分区数和可用内存量。请记住,增加缓冲区大小并不总是一件好事。如果生产者由于某种原因失败(例如,领导者响应比确认慢),那么堆上内存(on-heap)中缓冲的数据越多,它需要回收的垃圾就越多。检测应用程序以跟踪生成的消息数、平均消息大小和消耗的消息数等指标。Brokers的最佳实践在每个broker上,请压缩主题所需的内存和CPU资源。日志压缩(参见https://kafka.apache.org/documentation/#compaction)需要每个代理上的堆栈(内存)和CPU周期才能成功协作。而如果失败的日志压缩数据继续增长,将会给broker分区带来风险。您可以调整代理上的参数log.cleaner.dedupe.buffer.size和log.cleaner.threads,但请记住,这两个值都会影响单个代理上的堆栈使用。如果代理抛出OutOfMemoryError异常,它将被关闭并可能导致数据丢失。缓冲区大小和线程数取决于需要清除的主题分区的数量,以及这些分区中消息的数据速率和密钥大小。对于Kafka版本0.10.2.1,监视日志清理器的日志文件中的ERROR条目是检测其线程可能存在问题的最可靠方法。通过网络吞吐量监控代理。请监控发送(transmit,TX)和接收(receive,RX)的流量,以及磁盘I/O、磁盘空间和CPU使用率,容量规划是维持集群整体性能的关键步骤。在集群中的各个代理之间分配分区的领导者关系。Leader通常需要大量的网络I/O资源。例如,当我们配置复制因子(replicationfactor)为3并运行时,leader首先要获取分区的数据,然后向另外两个follower发送两份副本,再传输给多个节点需要数据。在消费者身上。因此,在此示例中,单个领导者使用的网络I/O至少是跟随者的四倍。而且,leader可能还需要对磁盘进行读操作,而follower只需要进行写操作。不要忽视监控代理的同步副本(ISR)收缩、复制不足的分区和不受欢迎的领导者。这些都是集群中潜在问题的迹象。例如,单个分区频繁的ISR收缩意味着该分区的数据速率超过了领导者的能力,无法再为消费者和其他副本线程提供服务。根据需要修改ApacheLog4j(https://github.com/apache/kafka/blob/trunk/config/log4j.properties)的各种属性。Kafka的brokerlogging会消耗大量的磁盘空间,但我们不能完全关闭它。因为有时候在发生意外之后,需要重构事件的来龙去脉,那么brokerlog将是我们唯一的途径,甚至是唯一的途径。禁用主题的自动创建,或为那些未使用的主题建立清除策略。例如,如果在设定的x天内没有新消息出现,则应认为该主题已死亡并将其从集群中移除。这使您无需花时间管理在集群中创建的额外元数据。对于那些具有持续高吞吐量的代理,提供足够的内存以避免它们从磁盘子系统读取。我们应该尽可能直接从操作系统的缓存中获取分区数据。然而,这意味着你必须保证你的消费者能够跟上“节奏”,而对于那些被延迟的消费者,你只能强制代理从磁盘读取。对于具有高吞吐量服务级别目标(SLO)的大型集群,请考虑为代理子集隔离不同的主题。至于如何确定需要隔离的topic,完全看自己的业务需求。例如,如果您有多个使用同一集群的联机事务处理(OLTP)系统,将每个系统的主题隔离到不同的代理子集中可以帮助限制潜在事件的影响范围。在旧客户端上使用新主题消息格式。应该在每个代理上加载额外的格式转换服务而不是客户端。当然,最好的办法是尽量避免这种情况发生。不要错误地认为在本地主机上测试代理代表了生产中的真实性能。请注意,在环回接口上测试复制因子为1的分区与大多数生产环境有很大不同。环回接口上的网络延迟几乎可以忽略不计,而在不涉及复制时,接收领导者确认所需的时间也可能有很大差异。其他资源希望以上建议能帮助您更有效地使用Kafka。如果你想提高你在Kafka方面的专业知识,请参考Kafka配套文档中的“操作”部分,其中包含操作集群等实用信息。此外,Confluent(https://www.confluent.io/)也定期举办和发布各种在线讨论,帮助大家更好地理解Kafka。原标题:20BestPracticesforWorkingWithApacheKafkaatScale,作者:TonyMancill