当前位置: 首页 > 科技观察

微服务架构-消息队列Kafka图最全知识点

时间:2023-03-21 00:30:55 科技观察

MQ(消息队列)是跨进程通信的方式之一,可以理解为异步rpc,上游系统对调用结果的态度往往重要但不紧急。使用消息队列有以下好处:业务解耦、流量调峰、弹性扩展。接下来介绍消息中间件Kafka。什么是卡夫卡?Kafka是一个分布式消息引擎。具有以下特点能够发布和订阅消息流(类似于消息队列)以容错和持久化的方式存储消息流多分区概念提高并行性Kafka架构概述Topic消息主题和队列,每条消息都有自己的主题,Kafka按主题对消息进行分类。在Kafka中,Topic在物理上可以划分为一个或多个分区(Partition)。每个分区在物理上对应一个以“topicName_partitionIndex”命名的文件夹。这个目录包含了这个分区(.log)和索引文件(.index)的所有消息,这使得Kafka的吞吐量可以水平扩展。Partition每个partition是一个顺序的、不可变的消息队列,可以连续添加;分区中的消息被分配了一个序列号,称为偏移量(offset),在每个分区中的偏移量是唯一的。producer在发布消息时,可以为每条消息指定一个Key,这样当消息发送给broker时,会按照分区算法将消息存储在对应的partition中(一个partition存储多条消息),如果合理设置分区规则,则所有的消息将被平均分配到不同的分区,从而实现负载均衡。BrokerKafka服务器用于存储消息。Kafka集群中的每个服务器都是一个Broker。消费者将从经纪人那里拉取订阅的消息。生产者将消息发送给Kafka,生产者根据主题分发消息。生产者还负责将消息与主题上的哪个分区相关联。最简单的方法是从分区列表中依次选择。也可以根据算法根据权重来选择分区。算法可以由开发人员定义。CousumerConsermer实例可以是一个独立的进程,负责订阅和消费消息。消费者使用consumerGroup来标识自己。同一个consumergroup可以并发消费多个partition的消息,同一个partition也可以被多个consumerGroup并发消费,但是在一个consumerGroup中,一个partition只能被一个consumer消费CousumerGroupConsumerGroup:同一个ConsumerGroup中的Consumer,Kafka将对应Topic中的每条消息只发送给其中一个ConsumerKafka生产者设计原则发送消息的过程1.序列化消息&&。计算分区根据key和value配置序列化消息,然后计算分区:如果在ProducerRecord对象中指定了一个分区,则使用这个分区。否则,根据key和topic的分区数计算余数。如果没有key,则随机产生一个计数器,这个计数器用来求分区数的余数。每次使用此计数器都会递增。2Sendtobatch&&唤醒Sender线程根据topic-partition获取对应的batches(Dueue),然后将message追加到batch中。如果批次已满,则唤醒Sender线程。队列的操作是带锁执行的,所以batch中的消息是有序的。当前方法上的后续Sender操作异步运行。3.Sender向broker(tpreplialeader)发送消息的顺序为:leader,leader_epoch,controller_epoch,isr,replicas等;Kafka客户端可以从任意broker获取所需的元数据信息;sender线程可以通过metadata信息知道tpleader的brokerIdproducer也保存了metadata信息,同时根据metadata策略进行更新(定时更新metadata.max.age.ms,故障检测,强制更新:检查元数据无效后,调用metadata.requestUpdate()强制更新publicclassPartitionInfo{privatefinalStringtopic;privatefinalintpartition;privatefinalNodeleader;privatefinalNode[]replicas;privatefinalNode[]inSyncReplicas;privatefinalNode[]offlineReplicas;}3.2幂等发送为了实现Producer的幂等性,Kafka引入了ProducerID(即PID)和SequenceNumber。对于每个PID,如果Producer发送的每条消息的序号都大于Broker维护的序号,说明中间有数据没有写入,也就是乱序。此时Broker拒绝消息,Producer抛出InvalidSequenceNumber,如果消息的序号小于等于Broker维护的序号,则说明消息已经保存,属于重复信息。Broker直接丢弃消息,Producer抛出DuplicateSequenceNumberSender发送失败会重试,保证每条消息都发送到broker4。发送方处理代理发送的生产响应。一旦代理完成处理发送方的生产请求,它会将生产响应发送给发送方。这个时候producer会执行我们为send()设置的回调函数。至此,生产者的发送已经执行完毕。Throughput&&delay:buffer.memory:较大的buffer设置有助于提高吞吐量,但太大的batch会增加延迟。可以使用linger_ms加上linger_ms参数:如果batch太大,或者producerqps不高,batch添加起来会很慢,我们可以强制在linger_ms时间后发送batch数据ack:回复了多少生产者从经纪人那里收到的信息被认为是成功的传输。0表示producer不需要等待leader的确认(吞吐量最高,数据可靠性最差)1表示需要leader确认写入到自己的本地log并立即确认-1/all表示所有的ISR完成后确认(最低吞吐量,最高数据可靠性)。每次一个Sender线程和一个长连接初始化一个producer实例,都会初始化一个Sender实例,然后再初始化一个新的。增加与代理的长连接。代码透视:每次初始化KafkaProducer时,分配一个空clientpportNum-np|grepTCP,适当增加生产者数量可以提高吞吐量Consumer设计原则poll消息消费者通过fetch线程(单线程)拉取消息消费者通过心跳线程与broker发送心跳。超时将视为挂机。每个消费者组在代理上都有一个协调器来管理它。消费者的加入和退出,以及消费者消息的位移,都是由协调者来处理的。位移管理消费者的消息位移代表了该组在topic-partition上的当前消费进度,消费者关机重启后可以从这个offset继续消费。在kafka0.8之前,位移信息是存储在zookeeper上的。由于zookeeper不适合高并发读写,新版Kafka将位移信息作为消息发送给topic__consumers_offsets所在的broker。__consumers_offsets默认有50个分区。消息的key是groupId+topic_partition,value是offset。KafkaGroupstateEmpty:初始状态,group没有成员,如果所有offsets过期,就变成DeadPreparingRebalance:group正在准备rebalanceAwaitingSync:group正在等待groupleader的分配方案Stable:稳定状态(Group稳定);Dead:组内没有成员,其Metadata已被移除注意rebalancingrebalance当某些原因导致消费者不再均匀消费partition时,kafka会自动执行Reblance,从而使消费者对partition的消费再次平衡。rebalance什么时候发生?:组订阅的主题数变化,主题分区数变化,消费者成员数变化。当消费者加入或离开组时,重新平衡过程被检测为崩溃。示例1.一个consumer被检测为crash导致的rebalance,比如heartbeatthread如果heartbeat没有在timeout时间内发送给broker,coordnator认为应该对group进行rebalance。接下来,其他消费者发送fetch请求后,coordnator会回复reblancenotification。当消费者成员收到请求后,只有领导者会根据分配策略进行分配,然后将各自的分配结果返回给协调者。此时只有消费者leader返回实际数据,其他返回空。消费者收到分配方法后,会将分配策略同步给每个消费者。示例2.消费者加入引起的reblance使用join协议表示有消费者想加入group。sync协议用于按照分配规则进行分配(上图摘自网络)引申:上述rebalance机制存在的问题在一个大型系统中,一个topic可能对应上百个consumer实例。将这些消费者陆续添加到一个空的消费者组中,会导致多次rebalance;另外,consumer实例的启动时间是不可控的,很可能会超过coordinator确定的rebalance超时时间(即max.poll.interval.ms),从而再次触发rebalance,每次rebalance的代价相当大高,因为很多状态需要在rebalance前持久化,rebalance后重新初始化。新版本的改进通过延迟进入PreparingRebalance状态来减少reblance的次数。新版本增加了group.initial.rebalance.delay.ms参数。当一个空的消费者组收到成员加入请求时,它不会立即转换到PreparingRebalance状态来启用rebalance。当时间超过group.initial.rebalance.delay.ms时,将group状态改为PreparingRebalance(开启rebalance)。实现机制是在coordinator底层增加一个groupstate:InitialReblance。假设此时有多个consumer相继启动,group状态先转换为InitialRebalance,经过group.initial.rebalance.delay.ms时间后,再转换为PreparingRebalance(开启rebalance)Broker设计原理Broker是Kafka集群中的一个节点。负责处理生产者发送的消息和消费者的请求。以及集群节点的管理等,由于涉及的内容比较多,先简单介绍一下,之后抽出一篇分享brokerzk注册broker消息存储。Kafka的消息以二进制格式紧凑存储,节省了大量空间。另外,消息存在于ByteBuffer中,而不是堆中,这样broker进程挂掉时,数据也不会丢失,同时也避免了gc问题。通过零拷贝和顺序寻址,消息存储和读取速度非常快。在处理fetch请求时,通过零拷贝加快速度。Broker状态数据broker设计,每台机器都持有相同的状态数据。主要包括以下内容:controller所在的brokerID,即当前集群中哪个broker是controller。集群中所有broker的信息:比如每个broker的ID,rack信息,集群中所有节点配置的几组连接信息Info:严格来说,这个和上一个有点重复,但是这是按经纪人ID和侦听器类型分组的。对于非常大的集群,使用这个itemcache可以快速定位和找到给定节点的信息,而不需要遍历上一个item的内容,这是一种优化。集群中所有分区的信息:所谓分区信息是指leader、ISR、AR信息,以及当前离线的副本集合。这部分数据按照topic-partitionID进行分组,可以快速的找到每个partition的当前状态。(注:AR的意思是assignedreplicas,即topic创建时分配给partition的replicas集合)Broker负载均衡partitionnumberload:每个broker的partition数量要均匀分布。partitionReplica分配算法如下:)并对要分配的Partition进行排序。第i个分区分配给第(imodn)个Broker。第i个Partition的第j个Replica分配给第((i+j)modn)-thBroker。CapacitysizeLoad:每个broker占用的硬盘大小应该是统一的。在Kafka1.1之前,Kafka可以保证每个broker上的partition数量是偶数。但是,由于每个分区的消息数量不同,不同硬盘之间的内存占用可能存在较大差异。健康)状况。在Kafka1.1中,增加了复制跨路径迁移功能kafka-reassign-partitions.sh。我们可以将其与监控系统结合起来,实现自动负载均衡。卡夫卡高可用性。在介绍kafka高可用之前,我们先介绍一下同步复制的几个概念:要求所有工作的follower都被复制,这条消息会被认为是commit。这种复制方式极大地影响了吞吐率。异步复制:Followers异步从leader拉取数据。只要数据被leader写入日志,就认为是完整的。commit,在这种情况下,如果Follower落后于L??eader更多,如果Leader突然崩溃,数据就会丢失。IsrKafka结合了同步复制和异步复制,使用ISR(与PartitionLeader同步的Replicalist)来保证数据不丢失和吞吐量之间的平衡。Producer只需要将消息发送给PartitionLeader,Leader将消息写入本地Log。Follower从Leader拉取数据。Follower收到消息后向Leader发送ACK。一旦Leader收到ISR中所有Replica的ACK,则认为消息已提交,Leader会增加HW并向Producer发送ACK。这样,如果leader挂了,只要ISR中有一个replica存活,就不会丢失数据。Isr动态更新Leader以跟踪ISR。如果ISR中的Follower掉线或落后太多,Leader会将其从ISR中移除。这里说的“落后太多”是指落后于Leader的Follower复制的消息数量超过预定值(replica.lag.max.messages)或者Follower超过一定时间(replica.lag.time.max.ms)向Leader发送获取请求。Broker节点在Zookeeper/brokers/topics/[topic]/partitions/[partition]/state中保存topic-partition的leader和Isr等信息Controller负责broker故障检查&&failover(fail/recover)Controller在Zookeeper上注册Watch,一旦某个Broker宕机,其在Zookeeper中对应的znode会被自动删除,Zookeeper会触发Controller注册的watch,Controller会读取最新的Broker信息,Controller会确定set_p,其中包含了所有Broker上的所有Partition对于set_p中的每个Partition,选举一个新的leader和Isr,并更新结果。3.1从/brokers/topics/[topic]/partitions/[partition]/state中读取Partition当前的ISR3.2,确定Partition新的Leader和Isr。如果当前ISR中至少有一个Replica还活着,则选择其中一个作为新的Leader,新的ISR包含当前ISR中所有存活的Replica。否则,选择Partition中任意存活的Replica作为新的Leader和ISR(这种情况下,可能存在数据丢失的可能性)3.3更新Leader、ISR、leader_epoch、controller_epoch:写入/brokers/topics/[topic]/partitions/[partition]/state直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。控制器可以在一次RPC操作中发送多个命令以提高效率。当controller挂掉后,每个broker都会在zookeeper的临时节点“/controller”注册一个watcher。当controller宕机时,“/controller”会消失,触发broker的watch,每个broker都会尝试创建一个新的controller路径,只有一次campaignSuccess被选为controller。如何使用Kafka保证幂等性不丢失消息首先,Kafka至少保证提交的消息。发送方有重试机制。producer业务方在使用producer发送消息时,会注册一个回调函数。在onError方法中重新发送消息。consumer拉取消息后,处理后commit,保证committed的消息必须被处理,不重复。消费者先拉取消息并保存,提交成功后删除缓存数据。Kafka的高性能分区提高并发性零拷贝顺序写入消息,聚合批次,缓存页面。业务端优化Kafka生产者。增加生产者的数量。ack配置批处理。