当前位置: 首页 > 科技观察

谈论卡夫卡狗屎!

时间:2023-03-19 10:58:30 科技观察

本文转载自微信公众号“微科技”,作者微科技。转载本文请联系微科公众号。大家好,我是汤姆大哥~Kafka是一个开源的消息引擎,很多人都比较熟悉,但是估计没有多少同学会深入它的源码,除非你是中间件团队的消息系统维护者.但是,技术行业也有专业化。市面上的开源框架那么多,每一个框架都经常迭代升级。花精力去深入了解每个框架的源码是不现实的。本文将从业务的角度,列出大家在工作中需要熟悉的一些知识。本文内容:首先,为什么要用kafka?削峰填谷。缓冲上下游瞬时突发流量,保护“脆弱”的下游系统不被压垮,避免引发全链路业务“雪崩”。系统解耦。发送方和接收方之间的松耦合在一定程度上简化了开发成本,减少了系统之间不必要的直接依赖。Kafka术语,allinonegoBroker:接收客户端发送的消息,并持久化消息Topic:主题。主题是承载消息的逻辑容器,在实际使用中常用于区分具体的服务。分区:分区。有序且不可变的消息序列。每个主题下可以有多个分区。Message:这里的消息指的是Kafka处理的主要对象。消息位移:偏移量。表示每条消息在partition中的位置信息,是一个单调递增的常量值。副本:副本。Kafka中的同一条消息可以复制到多个地方以提供数据冗余。这些地方称为副本。地下城也分为领袖地下城和追随者地下城,各有不同的角色划分。每个分区可以配置多个副本以实现高可用性。一个分区的N个副本必须在N个不同的Brokers上。制作人:制片人。向主题发布新消息的应用程序。消费者:消费者。从主题订阅新消息的应用程序。消费者位移:消费者抵消。表示消费者的消费进度,每个消费者都有自己的消费位移。offset保存在broker的内部topic中,而不是在clients的Consumergroup:ConsumerGroup中。一组同时消费多个分区以实现高吞吐量的多个消费者实例。再平衡:再平衡。消费组中的一个消费实例挂掉后,其他消费实例会自动重新分配订阅主题分区。ZooKeeper在其中的职责是什么?它是一个分布式协调框架,负责协调管理并保存Kafka集群的所有元数据信息,比如集群中运行着哪些Brokers,创建了哪些Topic,每个Topic有多少个partition,Leader在哪些机器上这些分区的副本位于。消息传输的格式是纯二进制字节序列。当然,消息仍然是结构化的,但在使用前必须转换成二进制字节序列。消息传输协议点对点模型。系统A发送的消息只能被系统B接收,其他系统无法读取A发送的消息。发布/订阅模型。这个模型也有一个发送者和一个接收者,但方式不同。发送者也称为发布者,接收者称为订阅者。与点对点模型不同的是,这种模型可能有多个发布者向同一个主题发送消息,也可能有多个订阅者,都可以接收到同一个主题的消息。在消息压缩生产者程序中配置compression.type参数,表示启用指定类型的压缩算法。props.put("compression.type","gzip"),表示生产者的压缩算法使用GZIP。这样Producer启动后产生的每条消息集都经过GZIP压缩,可以节省KafkaBroker端的网络传输带宽和磁盘占用。但是如果Broker指定了不同的压缩算法,比如Snappy,它会在生产端对消息进行解压,然后按照自己的算法重新压缩。各种压缩算法比较:吞吐量方面:LZ4>Snappy>zstd和GZIP;从压缩率上看,zstd>LZ4>GZIP>Snappy。Kafka默认不指定压缩算法。消息解压Consumer拉取消息后,Broker原样发送。当消息到达Consumer时,Consumer会将其解压并恢复为之前的消息。分区策略写一个类来实现org.apache.kafka.clients.Partitioner接口。实现两个内部方法:partition()和close()。然后显式配置producer端参数partitioner.class常用策略:轮询策略(默认)。保证消息最大程度均匀分布到所有分区。随机策略。random策略是老版本producer使用的partition策略,新版本改为roundrobin。按键分区策略。key可以是uid或者orderid,所有具有相同flag的消息都发送到同一个partition,可以保证一个partition中消息的顺序。其他分区策略。例如:基于地理位置的分区策略producer管理TCP连接当创建新的KafkaProducer实例时,producer应用程序将在后台创建并启动一个名为Sender的线程。当Sender线程开始运行时,它会首先创建与Broker的连接。此时并不知道向哪个topic发送消息,所以当Producer启动时,会发起与所有Brokers的连接。Producer通过metadata.max.age.ms参数定时更新元数据信息。默认值为300000,即5分钟。不管集群端有没有变化,Producer都会每5分钟强制刷新一次元数据,以保证是最新的数据。生产者发送消息:生产者使用带有回调通知的发送API,producer.send(msg,callback)。设置acks=all。Producer的一个参数,表示所有的replica都成功接收到消息,消息被认为是“已提交”,最高级别,acks的其他值。min.insync.replicas>1,表示消息必须至少写入多少个副本才能被视为“已提交”重试,是Producer的一个参数。当网络瞬间抖动时,可能会导致消息发送失败。此时配置重试>0的Producer可以自动重试发送消息,避免消息丢失。幂等Producer设置参数props.put("enable.idempotence",ture),Producer自动升级为幂等Producer,其他所有代码逻辑不需要改动。Kafka自动帮你去重和去重消息。原理很简单,就是经典的空间换时间交换,就是在Broker端多保存一些字段。当Producer发送相同字段值的消息时,Broker可以自动知道这些消息被重复了,并可以在后台默默地“丢弃”它们。它只能保证单个分区和单个会话上的消息幂等性。幂等的Producer可以保证一个topic的一个partition上不会出现重复的消息,但是无法实现跨多个partition的幂等。例如,如果使用轮询,如果下一次提交更改为分区,则事务无法解决。生产者可以保证消息被原子地写入多个分区。这批消息要么全部写入成功,要么全部失败。它可以保证跨分区和会话的幂等性。producer.initTransactions();try{producer.beginTransaction();producer.send(record1);producer.send(record2);//提交事务producer.commitTransaction();}catch(KafkaExceptione){//事务终止生产者。abortTransaction();}事实上,即使写入失败,Kafka也会将它们写入底层日志中,这意味着Consumer仍然会看到这些消息。Consumer端是否处理设置isolation.level,这个参数有两个值:read_uncommitted:这个是默认值,表示Consumer可以读取任何Kafka写入的消息read_committed:表示Consumer只会读取成功提交的事务型Producer的KafkaBroker如何存储事务写入的消息的数据?Kafka使用消息日志(Log)来保存数据。日志是磁盘上只能追加(Append-only)消息的物理文件。因为只能追加写,避免了慢速的随机I/O操作,取而代之的是性能更好的顺序I/O写操作,这也是实现Kafka高吞吐特性的重要手段。但是如果一直往日志中写入消息,最终会用完所有的磁盘空间,所以Kafka必须定期删除消息来回收磁盘。如何删除它?简单的说,就是通过LogSegment机制。在Kafka底层,一条日志被进一步细分为多个日志段,消息追加到最新的日志段。当一个日志段写满时,Kafka会自动拆分一个新的日志段。并封存旧日志段。Kafka在后台也有定时任务,定期检查旧的日志段是否可以删除,从而达到回收磁盘空间的目的。Kafka的备份机制是将相同的数据复制到多台机器上。副本的数量是可配置的。Kafka中的followcopy不会对外提供服务。副本的工作机制也很简单:生产者始终将消息写入领导副本;并且消费者总是从领导者副本中读取消息。至于follow副本,它只做一件事:异步向leader副本发送pullrequest,请求leader同步最新消息给它,肯定有一个时间窗口导致它和leader副本中的数据不一致领导者,或者它在领导者后面。为什么要引入消费组?主要是为了提高消费者的吞吐量。多个消费者实例同时消费,加速整个消费者的吞吐量(TPS)。一个consumergroup下,一个partition只能被一个consumer消费,但是一个consumer可能被分配了多个partition,所以在提交位移的时候也可以提交多个partition的位移。如果一个topic有2个partition,一个consumergroup有3个consumer,一个consumer不能分配到任何partition,处于idle状态。理想情况下,Consumer实例的数量应该等于该Group订阅主题的分区总数(可能是多个)。消费端拉取(批量),ACK消费端先拉取消费消息,再acks更新offset。1)消费者程序启动多个线程,每个线程维护一个专用的KafkaConsumer实例,负责完整的消息拉取和消息处理过程。一个KafkaConsumer负责一个分区,可以保证分区内消息消费的顺序。缺点:线程数受Consumer订阅topic的分区总数限制。2)任务分为消息获取和消息处理两部分。消费者程序使用单线程或多线程拉取消息,并创建专门的线程池来执行业务逻辑。优点:可以灵活调整消息获取和消息处理的线程数。缺点:不能保证分区内消息消费的顺序。另外,引入了多组线程,拉长了整个消息消费环节,最终导致提交正确位移变得异常困难,可能会出现重复消费或消息丢失的情况。Consumer端offset管理1)老版本的Consumergroup在ZooKeeper中保存了offset,但是很快发现zk不适合频繁写更新。2)在新版本的ConsumerGroup中,Kafka社区重新设计了ConsumerGroup的位移管理方式,采用了将位移保存在Broker端内部topic中的方式,也称为“位移??topic”,即由Kafka自己管理。原理很简单。消费者的位移数据作为普通的Kafka消息提交给__consumer_offsets。它的消息格式由Kafka自己定义,用户无法修改。位移topic的key主要包括三个部分:KafkaConsumer提交位移有两种方式:自动位移提交和手动位移提交。Kafka使用Compact策略删除置换主题中的过期消息,避免主题无限扩大。提供专门的后台线程,定期检查待压缩的topic,是否有符合条件的可删除数据。Rebalance触发条件1)群成员数量发生变化。例如,一个新的Consumer实例加入或离开组,或者一个Consumer实例崩溃并被“踢出”组。(99%的原因都是它引起的)2)订阅主题数发生了变化。ConsumerGroup可以使用正则表达式来订阅主题。例如consumer.subscribe(Pattern.compile(“t.*c”))表示Group订阅所有以字母t开头,以字母c结尾的主题。在ConsumerGroup运行过程中,如果你创建了一个新的topic满足这个条件,那么这个Group就会重新平衡。3)订阅主题的分区数发生变化。Kafka目前只允许增加一个主题的分区数。当partition数量增加时,所有订阅该topic的group也会被触发开启Rebalance。消息的顺序在Kafka的设计中,如果有多个partition,则无法保证全局的消息顺序。如果必须实现全局消息顺序,则只能使用单个分区。方法二:通过key分组,将相同key的消息放入同一个分区,保证本地有序的历史数据清洗策略是根据存储时间,log.retention.hours是根据日志大小清洗战略。通过log.retention.bytes控制组合方式