当前位置: 首页 > 后端技术 > Java

Java开发开源平台Kafka知识总结分享

时间:2023-04-01 20:58:27 Java

kafka的基本架构一个完整的kafka消息中间件应该包括以下节点:producer:生产消息的节点consumer:消费消息的节点broker:receiverproducer发送消息存储的节点zookeeper:管理和维护broker集群,保存broker集群的元数据信息,保证集群的高可用。Kafka消息存储方式kafka是如何存储消息的?如何保证整个集群的负载均衡和高可用?在讲解之前,我们需要补充一些关于kafka主题的基本概念:生产者必须向一个主题发送消息,也就是说,我们发送的所有消息都发送到这个主题的子分区:它本质上是一个用于存储消息的物理文件概念,分区中的每条消息都是有序的,遵循FIFO原则消息:发送的每条消息都是分区上存储的一个副本:Kafka经常搭建集群,所以同一条消息有多个副本,javaTraining是指分区在集群上存在多个副本。在集群中,分区分为领导者副本和跟随者副本。Leader副本主要负责读写请求,而Follower副本主要负责同步Leader副本数量,保证当Leader副本节点宕机时,选举机制仍能保证集群的高可用。broker:kafka集群中的各个节点producer:产生消息的节点consumer:消费消息的节点,同一条消息只能被consumergroup中的一个consumer消费consumergroup:kafka中的每个consumer都会属于一个consumer读者群没问题,我们通过下图来直观感受一下上面的基本概念。将消息存储在Kafka集群中需要几个步骤。创建主题指定分区数和副本数。同时,分区副本将根据分配算法分配给指定的brokerproducer。向该主题发送消息。Kafka根据消息的key确定对应的partition(如果没有key则自动生成一个),将消息发送给partition的leadercopy节点。第2步和第3步没什么好说的,重点关注Kafka如何分区将partitionReplicas分配给clusterbroker进行负载均衡。现在假设主题A有x个分区和3个副本。副本分配规则会按照设定的副本数,逐一分配topic下的各个partition。首先分配分区0的副本数,然后分配分区1的副本数。而不是把topic的所有partition都分配给第一个副本数,再分配给第二个副本数。有点绕,看下图就明白了:那么具体分配规则的大致算法如下:随机确定partition0第一个副本的broker位置,以及分配给这个partition的broker0是我们的leader副本存储节点。(同样,读/写partition0的消息都是在这个节点上进行的)随机确定分配给broker的partition0的下一个副本的offsetstepsize,然后根据offsetstepsize为broker定位broker具体计算remainingnumberofreplicas公式如下partition0第一个副本(leadercopy)的broker位置:(partition0索引位置+随机起始索引)%broker数量Partition0剩余副本broker位置:valshift=1+(randomoffsetstep+indexiofthei-thcopy)%(brokernumber-1)finalposition=(brokerpositionbythefirstcopy+shift)%randomoffsetstepofthebrokernumber:当分配下一个分区时,会在上一个分区的offsetstep上加1,可以通过随机起始索引和随机offsetstep,保证所有分区的leader副本尽可能均匀的分布到Kafka集群中。leader副本的节点负责所有的读写操作,保证不会因为leader副本分布过于集中而导致负载不均衡问题。Producer端核心处理流程Producer在创建消息时,必须在broker上创建一个topic,并指定partitions个数和partitioncopies个数。之后发送消息时,会发送到topic下的partition。创建主题后,消息发送可以分为以下几个核心步骤。实例化核心组件。该阶段会实例化一些核心数据,如broker集群的核心元数据信息、可用分区列表、消息key和value的序列化等。方式。发送消息会先将消息填充到缓冲区中,然后开启另一个线程从缓冲区中批量取数据发送给broker。缓冲区数据的一般设计结构如下。填充缓冲区的规则会根据消息的key计算出消息需要放入的partition。分区内的消息是有序的,是一种先进先出的双端队列结构(FIFO)。之所以是双端队列结构,是因为它内部有重试机制,当消息发送失败时,保证下一条要取的消息仍然是没有发送的消息。当消息写入缓冲区时,只需将其添加到队列的末尾即可。发送消息的大致核心流程如下:消费端核心处理流程消费端消息消费的主要核心流程分为以下几个步骤:消费组中的消费者订阅分区,从中拉取消息指定partition按照分配策略消费并提交消费offset消费组中的消费者按照分配策略订阅partitionKafka提供了3种分配策略:RoundRobinAssignor、RangeAssignor、StickyAssignorRoundRobinAssignor:round-robin是一种轮询策略,它将subscribe对主题分区和消费者进行排序,然后将所有主题分区平均分配给消费者组中的每个消费者(PS:类似于我们在游戏中发牌的方式,分区就是牌,消费者就是发牌的人玩游戏)。假设有两个主题t0和t1,分别有3个分区(PS:一共有6个分区t0p0,t0p1,t0p2,,t1p0,t1p1,t1p2),同一个consumer中有两个consumer(c0,c1)组订阅,则分区分配的最终结果如下:c0:[t0p0,t0p2,t1p1]c1:[t0p1,t1p0,t1p2]RangeAssignor:范围分配策略(默认分配策略),首先分配消费者按照字典顺序排序,然后将一个topic下的所有partition均匀分布,剩下的partition会按照consumer的先后顺序一个一个的分配给每个consumer。分配完成后,将分配下一个主题的分区。逻辑与第一个相同。假设有两个主题t0和t1,分别有3个分区(PS:有6个分区t0p0,t0p1,t0p2,t1p0,t1p1,t1p2),那么分区分配的最终结果如下:c0:[t0p0,t0p2,t1p0,t1p2]c1:[t0p1,t1p1]StickyAssignor:是对round-robin和range的进一步优化。因为这两种策略,当同一个消费组中的消费者订阅了不同的topic时,那么就可能会出现某个消费者分配较多partition的情况,造成不平衡的问题。粘性策略也可以针对这种情况进行更好的分配。同时,消费者组中新的在线/离线消费者分配时,会按照之前分配的分区进行分配,不会重新分配。尽可能保证较少的移动分区(PS:round-robin和range策略会为新的在线和离线消费者重新分配分区)。例如,有3个Consumer属于同一个消费组:C0、C1、C2。订阅了4个Topic:T0,T1,T2,T3,每个Topic有2个partition,那么StickyAssignor的分发结果如下图所示(加入RoundRobinAssignor分发作为对比):从指定partition中拉取消息消费fetcher。fetchedRecords判断是否有数据,如果没有数据,调用fetcher.sendFetches()获取数据。fetcher.sendFetches()会根据元数据信息判断consumer需要从哪些分区取消息,创建FetchRequest请求消息(这里stage会判断zone消息偏移的起始位置),然后调用ConsumerNetworkClient.send获取消息的方法。这里需要重点关注消费者如何确定消费offset的起始位置。起始位置的确定与auto.offset.reset的配置值有比较大的关系。有两个核心值earliest和latest(默认策略)。consumer会先检查本地是否已经有committed的offset,如果有则按照当前的offset继续读取。如果不是,则偏移量将根据auto.offset.reset的值确定。earliest和latest都会从kafka集群中获取提交的offset位置,并根据offset继续消费。如果没有committedoffset,最早的从头开始消费,最新的消费新产生的消息(PS:如果latest的策略是创建新的消费组,release申请阶段消息会丢失).不明白的举个例子,假设有6条消息,消费者组A还有两条消息未消费。最早重置时:重启消费组A,收到2条消息;新建消费组B,接收6条消息。最晚reset时:重启消费组A,收到2条消息;创建新的消费组B,没有收到消息。?提交消费抵消手动提交:客户端调用相应的API提交自动提交:自动提交不是通过定时任务周期性提交,而是在某些特定事件发生时触发提交两种提交方式两者最终都调用同步提交和ConsumerCoorditor的异步提交方法。您可以通过设置enable.auto.commit属性来指定手动或自动提交。为了保证分布式事务,对于发送端,Kafka有自己的重试次数配置。当重试次数达到,消息还没有发送成功时,我们可以取日志记录,然后通过补偿任务重试。对于消费者端,Kafka通过消息偏移量拉取消息,所以不存在broker无法发送到消费者端的问题。我们讨论的只是消费者收到数据,然后消费失败的情况。如果消费失败了,如果我们重新设置offset,下次不就可以继续拉了吗?但是,此解决方案存在某些问题。Kafka消息在分区中顺序读取和写入。如果前面的消息消费失败,后面的消息就消费不了,造成堆积问题。所以我们还是通过日志记录消费失败的消息,然后通过定时任务进行补偿,保证数据的最终一致性。