当前位置: 首页 > 科技观察

卡夫卡

时间:2023-03-18 22:55:54 科技观察

1。Kafka的诞生背景Kafka最初是由Linkedin开发的。它是一个分布式的、支持分区的、基于副本的分布式消息系统,它最大的特点是可以实时处理大量数据,满足各种需求场景:比如基于Hadoop的批处理系统,低-latency实时系统、storm/Spark流处理引擎、web/nginx日志、accessLogs、消息服务等,用scala语言编写,Linkedin在2010年贡献给Apache基金会,成为顶级开源项目。当今社会,商业、社交、搜索、浏览等各种应用系统都像信息工厂一样不断产生着各种信息。在大数据时代,我们面临着以下挑战:如何收集这些庞大的信息;如何分析它;如何及时做到以上两点;以上挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,生产者和消费者之间,需要一个bridge-messagesystem来沟通他们俩。从微观层面来说,这个需求也可以理解为如何在不同系统之间传递消息。分布式消息系统Kafka应运而生:Kafka——由linked-in开源;kafka-是解决上述问题的框架,实现了生产者和消费者的无缝连接;kafka-highyieldAhigh-throughputdistributedmessagingsystem(高吞吐量分布式消息系统);2、为什么要使用消息系统解耦:它允许你独立扩展或修改两边的处理,只要你保证它们遵守相同的接口约束。冗余:消息队列将数据持久化,直到它们被完全处理,通过这种方式避免了数据丢失的风险。在很多消息队列采用的“插入-获取-删除”范式中,在从队列中删除消息之前,您需要您的处理系统清楚地表明该消息已被处理,以确保您的数据被安全保存。直到你用完它。可扩展性:因为消息队列解耦了你的处理,所以很容易增加消息入队和处理的频率,只要增加额外的处理。Flexibility&peakprocessingcapacity:在流量急剧增加的情况下,应用仍然需要继续运行,但这种突发流量并不常见。一直投入资源待命来应对这样的访问高峰,无疑是一种巨大的浪费。使用消息队列可以让关键组件承受突发的访问压力,而不会因为突发的过载请求而完全崩溃。可恢复性:系统的一部分发生故障不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉了,加入到队列中的消息在系统恢复后仍然可以继续处理。顺序保证:在大多数使用场景中,数据处理的顺序很重要。大多数消息队列天生就是有序的,可以保证数据按照特定的顺序进行处理。(Kafka保证消息在一个Partition中的有序性)Buffering:有助于控制和优化数据在系统中流动的速度,解决生产消息和消费消息处理速度不一致的问题。异步通信:很多时候,用户不希望也不需要立即处理消息。消息队列提供了一种异步处理机制,允许用户将消息放入队列而不立即处理。将任意多的消息放入队列,并在需要时处理它们。三、Kafka的基本架构3.1.拓扑3.2。名词概念producer:消息生产者,向kafka集群发布消息的终端或服务。broker:包含在kafka集群中的服务器。topic:每条发布到Kafka集群的消息所属的类别,即Kafka是面向主题的。分区:分区是一个物理概念,每个主题包含一个或多个分区。Kafka分配的单位是partition。consumer:消费来自kafka集群的消息的终端或服务。消费组:在高层消费API中,每个消费者属于一个消费组,每条消息只能被消费组中的一个消费者消费,但可以被多个消费组消费。replica:分区的副本,保证分区的高可用。leader:副本中的一个角色,生产者和消费者只与leader交互。follower:副本中的角色,从领导者那里复制数据。controller:kafka集群中的一台服务器,用于leader选举和各种failover。zookeeper:kafka使用zookeeper来存储集群的元信息。4.Kafka基本特性高吞吐、低延迟:Kafka每秒可处理数十万条消息,延迟低至几毫秒;可扩展性:Kafka集群支持热扩展;持久性、可靠性:消息持久化到本地磁盘,支持数据备份,防止数据丢失;容错性:允许集群中的节点发生故障(如果副本数为n,则允许n-1个节点发生故障);高并发:支持数千个客户端同时读写;4.1.设计思路consumergroup:每个consumer可以组成一个group,每条消息只能被group中的一个consumer消费,如果一条消息可以被多个consumer消费,那么这些consumer一定在不同的group中。消息状态:在Kafka中,消息的状态存储在消费者中。broker不关心消费了哪个消息,谁消费了它。它只记录一个偏移值(指向分区中下一条要消费的消息位置)。也就是说,如果消费者处理不好,可能会导致broker上的一条消息被多次消费。消息持久化:Kafka会将消息持久化到本地文件系统,保持极高的效率。消息有效期:Kafka会长期保存消息,以便消费者可以多次消费。当然,很多细节都是可以配置的。批量发送:Kafka支持以消息集合为单位批量发送,提高推送效率。push-and-pull:Kafka中的Producer和consumer采用push-pull模式,即producer只向broker推送消息,consumer只从broker拉取消息。两者之间消息的生产和消费是异步的。Kafka集群中broker之间的关系:不是主从关系。每个代理在集群中具有相同的状态。我们可以随意添加或删除任何代理节点。负载均衡:Kafka提供了一个元数据API来管理broker之间的负载(对于Kafka0.8.x,对于0.7.x主要依靠zookeeper来实现负载均衡)。同步异步:Producer采用异步推送的方式,大大提高了Kafka系统的吞吐率(可以通过参数控制使用同步方式还是异步方式)。分区机制partition:Kafka的broker端支持消息分区。生产者可以决定将消息发送到哪个分区。分区中消息的顺序就是Producer发送消息的顺序。一个主题中可以有多个分区。具体分区数可配置。分区的意义很重要,后面的内容会逐步体现。离线数据加载:由于支持可扩展的数据持久性,Kafka也非常适合将数据加载到Hadoop或数据仓库中。插件支持:现在很多活跃的社区开发了很多插件来扩展Kafka的功能,比如用来配合Storm、Hadoop、flume的插件。4.2.应用场景日志收集:一个公司可以使用Kafka收集各种服务的日志,通过Kafka以统一接口服务的形式开放给各种消费者,如hadoop、Hbase、Solr等消息系统:解耦和生产者以及消费者、缓存消息等。用户活动跟踪:Kafka常用于记录web用户或app用户的各种活动,如浏览网??页、搜索、点击等活动。这些活动信息由各个服务器发布到Kafka的主题中,然后订阅者订阅这些主题来做实时的监控和分析,或者加载到hadoop或数据仓库中进行离线分析和挖掘。运营指标:Kafka也常用于记录运营监控数据。这包括从各种分布式应用程序收集数据并为各种操作(例如警报和报告)生成集中反馈。流处理:如sparkstreaming、storm5。推模式与拉模式5.1。点对点模式如上图所示,点对点模式通常基于拉取或轮询消息传递模型。该模型的特点是发送到队列的消息是一个且只有一个消费者进行处理。生产者将消息放入消息队列后,消费者主动拉取消息进行消费。点对点模型的好处是消费者拉取消息的频率可以自己控制。但是消息队列中是否有消息要消费,在消费者端是无法感知的,所以需要在消费者端额外的线程进行监听。5.2.发布-订阅模型如上图所示,发布-订阅模型是一种基于消息发送的消息模型,该模型可以有多个不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅了该类消息的消费者(类似微信♂)。由于消费者是被动接收推送的,所以不需要感知消息队列中是否有要消费的消息!但是consumer1、consumer2、consumer3的机器性能不同,所以处理消息的能力也会不同,但是消息队列无法感知消费者的消费速度!所以推送的速度就成了发布订阅模式的问题!假设三个消费者的处理速度分别是8M/s、5M/s、2M/s,如果队列推送速度是5M/s,那consumer3就承受不住了!如果队列推送的速度是2M/s,那么consumer1和consumer2就会有很大的资源浪费!5.3.Kafka的选择作为消息系统,Kafka遵循传统的方式,选择从Producer向broker推送消息,Consumer从Broker拉取消息。一些以日志为中心的系统,例如Facebook的Scribe和Cloudera的Flume,使用推送模型。其实推模式和拉模式各有优缺点。推送模式很难适应不同消费速率的消费者,因为消息的发送速率是由broker决定的。推送模式的目标是尽可能快地传递消息,但这很容易导致消费者来不及处理消息,典型表现是拒绝服务和网络拥塞。拉取模式可以根据消费者的消费能力,以合适的速率消费消息。对于Kafka来说,pull模式更适合。pull模式可以简化broker的设计,consumer可以自主控制消费消息的速率,同时consumer可以自己控制消费方式——批量消费或者逐条消费,并且可以选择不同的提交方式来实现不同的传输语义。6.卡夫卡工作流程6.1.发送数据我们看上面的架构图,producer就是生产者和数据入口。注意图中的红色箭头,Producer在写数据的时候总会找leader,不会直接把数据写给follower!如何找到领导者?写作过程是怎样的?我们看下图:首先从集群中获取partition的leader;生产者将消息发送给领导者;领导者将消息写入本地文件;追随者从领导者那里拉取信息;followers在本地写入消息,然后向leader发送ACK进行确认;leader收到所有replicas的ACK后,向producer发送ACK确认;6.1.1.要保证消息的顺序,需要注意的一点是消息写到leader后,follower会主动去leader那里同步!producer采用push方式向broker发布数据,每条消息append到partition上,顺序写入磁盘,保证同一个partition中的数据是有序的!写入示意图如下:6.1.2.消息加载分区上面说了数据会写入不同的分区,那么Kafka为什么要分区呢?相信大家也能猜到分区的主要目的是:方便扩展:因为一个topic可以有多个分区,我们可以通过扩展机器来轻松应对数据量的增加。提高并发性:以分区为读写单元,多个消费者可以同时消费数据,提高消息处理效率。熟悉负载均衡的朋友应该都知道,当我们向一台服务器发送请求时,服务器可能会加载请求,将流量分发到不同的服务器上。在kafka中,如果一个topic有多个partition,producer如何知道将数据发送到哪个partition?kafka有几个原则:partition写入时可以指定需要写入的分区,如果指定了,则写入对应的分区;如果没有指定分区,但是设置了数据的key,则会根据key的值Hash一个分区;如果既没有指定partition也没有设置key,将通过轮询选择一个partition;6.1.3.保证消息不丢失是一个消息队列中间件的基本保证。向Kafka写入消息时,如何保证消息不丢失?其实就是上面写流程图中描述的,就是通过ACK响应机制!当生产者向队列写入数据时,可以设置参数来判断是否确认Kafka已经接收到数据。该参数可以设置的值为0、1、全部。0表示producer不需要等待集群返回就可以向集群发送数据,不保证消息发送成功。最不安全但效率最高。1表示当producer向集群发送数据时,只要leader响应就可以发送下一个,只保证leader发送数据成功。all表示代表producer向集群发送数据需要所有的follower都完成从leader的同步后才能发送下一个,保证leader发送数据成功,并且所有副本都有备份。安全性最高,但效率最低。最后要注意的是,如果往一个不存在的topic写数据,能写成功吗?Kafka会自动创建一个topic,分区数和副本数默认配置为1。6.2.保存数据Producer将数据写入Kafka后,集群需要保存数据!Kafka将数据保存在磁盘上。或许在我们的常识中,写入磁盘是一个耗时的操作,不适合这种高并发的组件。Kafka一开始会单独开辟一块磁盘空间,顺序写入数据(比随机写入效率更高)。6.2.1.分区结构如前所述,每个主题都可以分为一个或多个分区。如果觉得题目比较抽象,那么分区就比较具体!Partition在服务器上的表示是一个一个的文件夹。每个分区文件夹下有多组segment文件,每组segment文件包含.index文件、.log文件、.timeindex文件(早期版本没有这个文件)三个文件,log文件其实就是存放消息的地方,index和timeindex文件是检索消息的索引文件。如上图所示,这个分区有三组segment文件,每个日志文件的大小都是一样的,但是存储的消息条数不一定相等(每条消息的大小不一致)。该文件以段的最小偏移量命名。例如000.index存放的是偏移量为0~368795的消息。Kafka采用切分+索引的方式来解决搜索效率的问题。6.2.2.消息结构上面说了,日志文件其实就是一个存放消息的地方。我们在producer里面写给kafka的也是一条一条的消息。那么日志中存储的消息是什么样的呢?消息主要包括消息体、消息大小、偏移量、压缩类型……我们需要知道的是以下三项:分区位置;Messagesize:消息大小占用4个字节,用于描述消息的大小;消息体:消息体存储实际的消息数据(压缩后),占用空间根据具体消息不同而不同。6.2.3.存储策略无论消息是否被消费,kafka都会存储所有的消息。那么旧数据的删除策略是什么?基于时间,默认配置为168小时(7天);基于大小,默认配置为1073741824。需要注意的是,kafka读取特定消息的时间复杂度是O(1)O(1),所以这里删除过期文件并不会提升kafka的性能!6.3.消费数据消息存储到日志文件后,消费者就可以消费了。在谈到消息队列通信的两种模式时,我们提到了点对点模式和发布订阅模式。Kafka采用发布-订阅模型。消费者主动去Kafka集群拉取消息。与生产者类似,消费者在拉取消息的时候也会找领导者拉取消息。多个消费者可以组成一个消费组,每个消费组都有一个groupid!同一个消费组中的消费者可以消费同一个topic下不同分区的数据,但是组内的多个消费者是不会消费同一个分区的数据的!我们看下图:图中可以看出consumergroup中consumer的个数小于partition的个数,所以会出现某个consumer消费多个partition的数据,消费速度不如partition的情况与只处理一个分区的消费者的处理速度一样快!如果consumergroup中consumer的个数超过了partition的个数,会不会有多个consumer消费同一个partition的数据?上面已经说了,这不会发生!额外的消费者不消费任何分区数据。所以在实际应用中,建议消费者组中的消费者数量与分区数量保持一致!在保存数据那一节,我们讲到partition分为多组segment,每个segment包含.log、.index、.timeindex文件,每条存储的消息包含offset、消息大小、消息体……我们反复讲过segment和offset,如何使用segment+offset来搜索消息?现在查找偏移量为368801的消息是怎样的过程?我们先来看下图:1、首先找到offset的368801消息所在的segment文件(使用二分法),这里找到的就是第二个segment文件。2.打开找到的段中的.index文件(即368796.index文件,文件的起始偏移量为368796+1,我们要查找的消息368801在索引中的偏移量为368796+5=368801,所以这里要找到的相对偏移量是5)。由于文件使用稀疏索引存储相对偏移量与对应消息物理偏移量的关系,所以直接查找相对偏移量为5的索引是无法找到的,这里同样采用二进制的方式寻找相对偏移量offset小于或等于指定的相对偏移量是相对偏移量的索引条目中最大的,因此找到相对偏移量为4的索引。3、根据查找到的相对偏移量为4的索引,确定消息存储的物理偏移量为256。打开数据文件,从256位置开始依次扫描,直到找到偏移量为368801的消息。该机制基于偏移量是有序的,采用分段+有序偏移量+稀疏索引+二分查找+顺序查找等多种方式高效查找数据。至此,消费者就可以拿到需要处理的数据进行处理了。那么每个消费者如何记录自己消费的地点呢?早期版本消费者会消费offset来维护zookeeper,消费者每隔一段时间就会上报一次,容易导致重复消费,性能不好!在新版本中,consumer消费的offset已经直接维护在Kafka集群的topicconsumer_offsets中。