当前位置: 首页 > 科技观察

看了这篇文章之前,我以为我已经很了解Kafka了

时间:2023-03-23 11:55:38 科技观察

Kafka是一个消息系统,最初是从LinkedIn发展起来的,作为LinkedIn的ActivityStream和操作数据处理管道(Pipeline)的基础。图片来自Pexels它现在被许多不同类型的公司用作多种类型的数据管道和消息传递系统。活动流数据是几乎所有网站在报告其网站使用情况时都会使用的数据中最常见的部分。活动数据包括页面浏览量(PageViews)、有关查看的内容和搜索的信息。这类数据的处理方式通常是先将各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运行数据是指服务器性能数据(CPU、IO使用率、请求时间、服务日志等)。运营数据的统计方法多种多样。近年来,事件和操作数据处理已成为网站软件产品功能的重要组成部分,需要稍微复杂的基础架构来支持它。Kafka的基本概念Kafka是一个分布式的、基于发布/订阅的消息系统。主要设计目标如下:提供时间复杂度为O(1)的消息持久化,即使对于TB级别以上的数据也是如此。具有恒定时间复杂度的访问性能。高吞吐量。即使在非常便宜的商用机器上,单台机器也可以支持每秒超过100K条消息的传输。支持KafkaServer之间的消息分区和分布式消费,同时保证消息在各个Partition中的顺序传输。支持离线数据处理和实时数据处理。Scaleout:支持在线水平扩展。生产者和消费者对于Kafka,有两种基本类型的客户端:生产者和消费者。此外,还有用于数据集成的KafkaConnectAPI和用于流处理的KafkaStreams等高级客户端,但这些高级客户端的底层仍然是生产者和消费者API,它们只是在上层进行了封装.这个很好理解,生产者(也称发布者)创建消息,消费者(也称订阅者)负责消费或阅读消息。主题(Topic)和分区(Partition)在Kafka中,消息按照主题(Topic)进行分类,每个主题对应一个“消息队列”,有点类似于数据库中的表。但是如果我们把所有同类的消息都放到一个“中央”队列中,就不可避免地缺乏可扩展性。无论是生产者/消费者数量的增加,还是消息数量的增加,都可能会耗尽系统或存储的性能。我们举一个生活中的例子来说明:现在A市生产的一种商品,需要通过公路运输到B市。那么单线高速公路无论是在“A市货物多”还是“现在C市也往B市运东西”的情况下,都会出现“吞吐量不足”的问题。所以我们现在引入分区(Partition)的概念,类似于“让更多的道路被修建”的方式来完成我们主题的横向扩展。Broker和Cluster(集群)一个Kafka服务器也称为Broker,它接受生产者发送的消息并将它们存储在磁盘上;Broker还服务于消费者拉取分区消息的请求,并返回当前提交的消息。使用特定的机器硬件,Broker可以每秒处理数万个分区和数百万条消息。(现在是百万量级,我查了一下,好像在集群的情况下吞吐量真的很高。)几个Broker组成一个集群(Cluster),集群中的一个Broker会成为集群控制器。ClusterController,负责管理集群,包括将分区分配给Brokers,监控Broker故障等。在集群中,一个分区负责一个Broker,Broker也称为分区的领导者。当然也可以将一个partition复制到多个Broker上来实现冗余,这样当一个Broker出现故障时,它的partition可以重新分配给其他Broker负责。下图是一个例子:Kafka的一个关键特性是日志保留(Retention)。我们可以配置主题的消息保留策略,比如只保留一定时间的日志或者只保留特定大小的日志。当超过这些限制时,旧消息将被删除。我们也可以为一个topic单独设置消息过期策略,这样可以针对不同的应用进行个性化设置。多集群随着业务的发展,我们往往需要多集群,通常有以下几个原因:基于数据的隔离基于安全隔离多数据中心(容灾)在构建多数据中心时,往往需要实现消息交换。例如,如果用户修改了个人数据,无论后续请求由哪个数据中心处理,都需要反映这种更新。或者,需要将来自多个数据中心的数据汇聚到一个总控中心进行数据分析。上面提到的分区复制冗余机制只适用于同一个Kafka集群内。对于多个Kafka集群的消息同步,可以使用Kafka提供的MirrorMaker工具。本质上,MirrorMaker只是使用队列连接的Kafka消费者和生产者。它使用来自一个集群的消息并向另一个集群生成消息。Kafka的设计与实现以上我们了解了Kafka中的一些基本概念,但是作为一个成熟的“消息队列”中间件,还有很多有趣的设计值得我们思考。让我们简要地列出其中的一些。Kafka是存储在文件系统上的,首先要知道Kafka的消息是存在文件系统上的。Kafka严重依赖文件系统来存储和缓存消息。一般人认为“磁盘慢”,所以对这样的设计持怀疑态度。事实上,磁盘比人们预期的要快和慢得多,这取决于它们的使用方式。良好的磁盘结构设计可以使它们与网络一样快。现代操作系统对磁盘读写做了一些优化,以加快磁盘访问速度。比如readahead,会把一个比较大的磁盘提前快速读入内存。后写将许多小的逻辑写操作组合成一个大的物理写操作。而且操作系统还会将主存剩余的所有空闲内存空间作为磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接I/O会绕过磁盘缓存)).结合这些优化特性,如果是顺序访问磁盘,在某些情况下可能比随机内存访问更快,甚至可能与网络速度相差无几。上面的Topic其实是一个逻辑概念,面向消费者和生产者,物理上实际存放的是Partition。每个Partition最终对应一个目录,所有的消息和索引文件都存放在这个目录中。默认情况下,如果创建时没有指定Partition的个数,每个Topic只会创建一个Partition。比如我创建了一个名为test的Topic,没有指定Partition的个数,那么默认会创建一个名为test-0的文件夹。这里的命名规则是:-。发布到分区的任何消息都将附加到分区数据文件的末尾。这样的顺序写入磁盘操作使得Kafka非常高效(已经验证顺序写入磁盘的效率高于随机写入内存的效率,这是Kafka高吞吐量的一个非常重要的保证)。每条消息发送给Broker,Broker会根据Partition规则选择存储哪个Partition。如果Partition规则设置得当,所有的消息都可以均匀分布到不同的Partition中。Kafka中的底层存储设计假设我们在Kafka集群中只有一个Broker。我们创建两个Topic名称:“Topic1”和“Topic2”,Partition数量分别为1和2。然后我们会在我们的根目录下创建如下三个文件夹:|--topic1-0|--topic2-0|--topic2-1在Kafka的文件存储中,同一个Topic下有多个不同的Partition,每个Partition是一个目录.并且每个目录被均匀分布成多个大小相等的SegmentFiles。SegmentFile由索引文件和数据文件组成。他们总是成对出现。后缀“.index”和“.log”代表段索引文件和数据文件。现在假设我们设置每个segment的大小为500MB,启动producer向topic1写入大量数据,topic1-0文件夹下会生成类似下面的文件:|--topic1-0|--00000000000000000000。指数|-00000000000000000000000000000000000000000000000000000000000000000000000000000000003687699999999999证|-0000000000000000368769.LOG|-000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000110110110110110110110110110110110110110110110110110110110110110110110110110110110111转。--topic2-1Segment是Kafka文件存储的最小单位。段文件命名规则:Partitionglobal的第一个段从0开始,后面的每个段文件的名字都是前一个段文件最后一条消息的偏移值。最大长度为64位,字符长度为19位,数字不补0。如00000000000000368769.index、00000000000000368769.log。以上面这对SegmentFiles为例,说明索引文件和数据文件的对应关系:以索引文件中的元数据<3,497>为例,第三个Message在数据文件中表示turn(在globalPartition368769+3=368772message中表示),message的物理偏移地址为497。注意Index文件不是从0开始的,也不是每次递增1。这是因为Kafka采用了稀疏索引的存储方式,每隔一定的数据字节就创建一个索引。它减小了索引文件的大??小,使Index可以映射到内存,减少了查询时的磁盘IO开销,不会给查询带来过多的时间消耗。因为它的文件名是上一个Segment的最后一个message的Offset,所以当需要查找指定Offset的Message时,可以通过对所有Segment的文件名进行二分查找找到它所属的Segment。然后在它的Index文件中找到它在文件上对应的物理位置,就可以取出Message了。由于消息在Partition的segment数据文件中是顺序读写的,消息消费后不会被删除(删除策略是针对过期的segment文件),这就是Kafka高性能的原因,顺序磁盘IO收纳设计师,很重要。Kafka如何准确知道Message的offset?这是因为Kafka定义了一个标准的数据存储结构,Partition中的每条Message都包含以下三个属性:可以简单的看做一个ID。MessageSize:表示Message内容Data的大小。Data:Message的具体内容。ProducerDesignSummary在我们发送消息之前,先问几个问题:每条消息都是关键的,不能容忍丢失的吗?偶尔重复消息可以吗?我们是否关心消息延迟或写入消息的吞吐量?例如,有一个信用卡交易处理系统,当交易发生时,向Kafka发送消息,另一个服务读取消息并根据规则引擎检查交易是否通过,并通过Kafka返回结果。对于这样的业务,消息不能丢失或重复。由于交易量大,吞吐量需要尽可能大,延迟可以稍微高一点。再比如,如果我们需要采集网页的用户点击数据,对于这样的场景,少量的消息丢失或者重复是可以容忍的,延迟不重要,只要不影响用户体验即可,而吞吐量是根据实时用户数决定的。不同的业务需要不同的写法和配置。这里不讨论具体的方法,现在让我们看一下生产者写消息的基本流程:流程如下:首先,我们需要创建一个ProducerRecord,这个对象需要包含主题(Topic)和值(Value)的消息,可以选择性地指定键(Key)或分区(Partition)。发送消息时,生产者将键和值序列化为字节数组,发送给分区器。如果我们指定一个分区,分配器返回那个分区;否则,分配器根据键值选择一个分区并返回它。选择分区后,生产者知道消息所属的主题和分区,并将这条记录添加到相同主题和分区的批量消息中,另一个线程负责将这些批量消息发送到相应的KafkaBroker。当Broker接收到消息时,如果写入成功,会返回一个包含消息的topic、partition、displacement的RecordMetadata对象,否则会返回异常。生产者收到结果后,可能会出现异常重试。ConsumerDesignSummary①Consumer和ConsumerGroup假设这样一个场景:我们从Kafka读取消息,检查消息,最后生成结果数据。我们可以创建一个消费者实例来执行此操作,但是如果生产者写入消息的速度比消费者读取消息的速度快怎么办?随着时间的推移,消息的堆积越来越严重。对于这种场景,我们需要添加多个消费者进行水平扩展。Kafka消费者是消费者组的一部分。当多个消费者组成一个消费者组消费主题时,每个消费者会收到来自不同分区的消息。假设有一个T1主题,有4个分区;同时,我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1会收到来自这4个分区的消息,如下图:如果我们在消费者组G1中添加一个新的消费者C2,那么每个消费者都会收到来自两个分区的消息,如下图示例:如果我们最多添加4个消费者,每个消费者会分别从一个分区接收消息,如下图:但是如果我们继续往这个消费者组中添加消费者,剩下的消费者就会闲置,不会收到任何消息:总而言之,我们可以横向扩展到通过增加消费群体中的消费者数量来提高消费能力。这也是为什么建议在创建topic时使用更大数量的partition,这样在消费负载高的时候可以添加consumer来提升性能。此外,消费者的数量不应超过分区的数量,因为额外的消费者是空闲的并且没有帮助。Kafka一个很重要的特性就是只需要写一次消息,并且可以支持任意数量的应用读取这条消息。换句话说,每个应用程序都可以读取全部消息。为了让每个应用程序都能读取全部消息,应用程序需要有不同的消费者组。对于上面的例子,如果我们新增一个消费者组G2,而这个消费者组有两个消费者,就会是这样的:在这个场景下,消费者组G1和消费者组G2都可以收到T1的全量消息topic,按道理来说,它们属于不同的应用。最后总结一下:如果应用需要读取全量消息,那么请为应用设置一个消费组;如果应用的消费能力不足,那么可以考虑将消费者添加到这个消费组中。②消费者组和分区再平衡可以看出,当一个新的消费者加入消费者组时,它会消费一个或多个分区,而这些分区之前由其他消费者负责。另外,当一个消费者离开消费者组时(比如重启、死机等),它消费的分区会被分配给其他分区。这种现象称为重新平衡(Rebalance)。Rebalancing是Kafka一个非常重要的属性,它保证了高可用和水平扩展。但是也需要注意的是,在rebalancing期间,所有的consumer都不能消费消息,所以整个consumergroup会暂时不可用。而且,重新平衡分区也会导致原来的消费者状态过期,这会导致消费者重新更新状态,这也会降低这段时间的消费性能。稍后我们将讨论如何安全地执行再平衡以及如何尽可能避免它。消费者通过周期性地向一个作为组协调器(GroupCoordinator)的Broker发送心跳(Hearbeat)来保持在消费者组中的存活。这个Broker不是固定的,每个消费群体可能不一样。当消费者拉取消息或提交消息时发送心跳。如果消费者在一定时间内没有发送心跳,它的会话(Session)就会过期,组协调器就会认为消费者宕机了,进而触发重平衡。可以看出,从consumer下来到session过期是有一定时间的。在此期间,消费者的分区不能消费消息。通常情况下,我们可以做一个优雅的关闭,让consumer给groupcoordinator发送一个leave消息,这样groupcoordinator就可以马上rebalance,而不用等session过期。在0.10.1版本,Kafka修改了心跳机制,将发送心跳和拉取消息分开,使得发送心跳的频率不受拉取频率的影响。此外,更高版本的Kafka支持配置消费者不拉取消息但仍保持存活的时间。这种配置可以避免活锁。活锁意味着应用程序没有故障但由于某种原因不能进一步消费。③Partition和消费模型上文提到,Kafka中一个Topic中的消息是分布存储在多个Partition(分区)中的,而ConsumerGroup在消费时需要从不同的Partition中获取消息,那么到底如何重建Topic呢?消息的顺序如何?答案是:没办法。Kafka只保证Partition中的消息是有序的,不管全局情况如何。接下来的问题是:Partition中的消息可以被多次消费(被不同的ConsumerGroup),那么Partition中消费的消息什么时候删除呢?Partition如何知道ConsumerGroup当前在哪里消费?不管消息是否被消费,除非消息过期Partition永远不会删除消息。例如设置保留时间为2天,则消息发布后2天内任意群组均可消费,2天后消息自动删除。Partition会为每个ConsumerGroup保存一个offset,记录该Group消费的位置。如下图所示:④为什么Kafka是Pull模型?消费者应该向Broker索取数据(Pull)还是Broker向消费者推送数据(Push)?Kafka作为消息系统,沿用了传统的方式,选择从Producer向Broker推送消息,Consumer从Broker拉取消息。一些以日志为中心的系统,例如Facebook的Scribe和Cloudera的Flume,使用的是Push模型。其实无论是Push模式还是Pull模式,都各有优缺点。Push模式很难适应不同消费速率的消费者,因为消息的发送速率是由Broker决定的。Push模式的目标是尽可能快地传递消息,但这很容易导致Consumer来不及处理消息,典型的表现就是拒绝服务和网络拥塞。Pull模式可以根据消费者的消费能力,以合适的速率消费消息。对于Kafka来说,Pull模式更适合。Pull模式简化了Broker的设计,Consumer可以自主控制消费消息的速率。同时Consumer可以自己控制消费方式,可以批量消费,也可以逐条消费。同时也可以选择不同的提交方式来实现不同的传输语义。Kafka是如何保证可靠性的当我们谈到可靠性时,我们总是会提到保证*这个词。可靠性保证是我们构建应用程序的基础。比如关系型数据库的可靠性保证是ACID,即Atomicity、Consistency、Isolation、Durability。Kafka中的可靠性保证有以下四点:对于分区来说,它的消息是有序的。如果生产者先将消息A然后将消息B写入分区,则消费者将读取消息A,然后再读取消息B。当一条消息已写入所有同步副本时,该消息被视为已提交。这里的写可能只是写到文件系统的缓存中,不一定会刷到磁盘中。生产者可以在不同的时间等待确认,比如等待分区的主副本写入后返回,后者等待所有in-sync状态的副本写入后返回。一旦一条消息被提交,只要有一个副本还活着,就不会丢失任何数据。消费者只能读取提交的消息。使用这些基本保证,我们构建了一个可靠的系统。这时候,我们需要考虑一个问题:我们的应用需要多大的可靠性?可靠性不是免费的,它与系统可用性、吞吐量、延迟和硬件价格密切相关。一得则失一得。因此,我们往往需要做出取舍,一味追求可靠性是不切实际的。手工搭建一个Kafka通过上面的描述,我们已经大致了解了Kafka是什么,现在尝试在本地搭建一个来体验一下。第一步:下载Kafka这里以MacOS为例,在安装Homebrew时执行如下代码:brewinstallkafka由于Kafka依赖Zookeeper,所以下载时会自动下载。第二步:启动服务。在启动服务之前,我们首先需要修改Kafka的监听地址和端口为localhost:9092:vi/usr/local/etc/kafka/server.properties然后如下图修改:启动Zookeeper和Kafka依次:brewservicesstartzookeeperbrewservicesstartkafka然后执行下面的语句创建一个名为“test”的Topic:kafka-topics--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictest我们可以通过下面的命令查看我们的Topic列表:kafka-topics--list--zookeeperlocalhost:2181第三步:发送消息然后我们新建一个console,运行下面的命令创建一个consumer来跟随刚刚创建的topic:kafka-console-consumer--bootstrap-serverlocalhost:9092--topictest--from-beginning使用控制台向刚刚创建的Topic添加消息,观察刚刚创建的消费者窗口:kafka-console-producer--broker-listlocalhost:9092--topictest可以通过消费者窗口到正确的消息ge:参考:Kafka设计分析(一):Kafka背景与架构介绍Kafka系列(一)初识KafkaKafka入门Kafka专题介绍为什么要分区?-知乎Kafka的设计与实践思考Kafka系列(六)数据可靠传输