ApacheKafka最初是LinkedIn开源的分布式消息系统,现在是Apache的子项目,已经成为开源领域使用最广泛的消息系统之一。Kafka社区非常活跃。从0.9版本开始,Kafka的slogan就从“一个高吞吐量的分布式消息系统”变成了“一个分布式流媒体平台”。Kafka与传统消息系统的区别在于:Kafka是一个分布式系统,易于向外扩展。它为发布和订阅提供高吞吐量。它支持多个订阅者并在失败时自动平衡消费者。消息的持久性。Kafka和其他消息队列的对比:入门实例生成器代码如下:importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassUserProducerendalKafkaProducerex,String>producer;privatefinalStringtopic;privatefinalPropertiesprops=newProperties();publicUserKafkaProducer(Stringtopic){props.put("metadata.broker.list","localhost:9092");props.put("bootstrap.servers","master2:6667");props.put("retries",0);props.put("batch.size",16384);props.put("linger.ms",1);props.put("buffer.memory",33554432);props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");producer=newKafkaProducer(props);this.topic=topic;}@Overridepublicvoidrun(){intmessageNo=1;while(true){StringmessageStr=newString("Message_"+messageNo);System.out.println("Send:"+messageStr);//返回的是Future,异步发送producer.send(newProducerRecord(topic,messageStr));messageNo++;try{sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}}}}消费者代码如下:Propertiesprops=newProperties();/*定义kakfa服务地址,无需指定所有broker*/props.put("bootstrap.servers","localhost:9092");/*Makeconsumergroup*/props.put("group.id","test");/*是否自动确认offset*/props.put("enable.auto.commit","true");/*自动确认offset时间间隔*/props.put("auto.commit.interval.ms","1000");props.put("session.timeout.ms","30000");/*key的序列化类*/props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");/*值序列化类*/props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");/*consumer的定义*/KafkaConsumerconsumer=newKafkaConsumer<>(props);/*Consumer订阅主题,可以订阅多个*/consumer.subscribe(数组。asList("foo","bar"));/*读取数据,读取超时为100ms*/while(true){ConsumerRecordsrecords=consumer.poll(100);for(ConsumerRecordrecord:records)System.out.printf("offset=%d,key=%s,value=%s",record.offset(),record.key(),record.value());}Kafka架构原理对于Kafka的架构原理,我们首先提出以下问题:Kafka的topic和partitions在内部是如何存储的,有什么特点?与传统的消息系统相比,Kafka的消费模型有哪些优势?Kafka是如何实现分布式数据的存储和数据读取?Kafka架构图Kafka术语解释在一个Kafka架构中,有多个Producers、Brokers和Consumers。每个Producer可以对应多个Topic。Consumer只能对应一个ConsumerGroup。整个Kafka架构对应一个ZK集群。ZK管理集群配置,选举Leader,并在ConsumerGroup发生变化时进行Rebalance。Topic和Partition对Kafka中的每条消息都有一个Topic。一般对于我们应用中产生的不同类型的数据,可以设置不同的topic。一个主题通常有多个消息订阅者。当一个生产者向一个主题发布消息时,订阅这个主题的消费者都可以收到生产者写的新消息。Kafka为每个topic维护一个分布式分区(Partition)日志文件,每个Partition是Kafka存储层面的一个AppendLog。任何发布到这个Partition的消息都会被追加到Log文件的末尾,分区中的每条消息都会被分配一个按时间顺序单调递增的序号,这就是我们的Offset。偏移量是一个Long类型的数字。我们可以通过这个Offset来确定这个Partition下的唯一消息。Partition下保证顺序,Topic下不保证顺序。上图中,我们的生产者会决定发送到哪个Partition:如果没有Key值,就会发送轮询。如果有Key值,则对Key值进行Hash,然后取分区数的余数,保证相同的Key值会路由到同一个分区;如果你想在队列中有很强的序列一致性,你可以将所有消息设置为相同的键。消费模型生产者将消息发送到Kafka集群后,会被消费者消费。一般来说,我们有两种消费模型:推模型(Push)拉模型(Pull)基于推模型的消息系统,消息代理记录消费状态。消息代理将消息推送给消费者后,将消息标记为已消费,但这种方式不能很好地保证消费的处理语义。比如我们把消息发送给消费者后,如果消费者进程挂了或者因为网络原因没有收到消息,如果我们在消费者代理中标记为已消费,消息就会丢失。如果我们采用生产者收到消息后回复的方式,消息代理需要记录消费状态,这是不可取的。如果使用Push,消息消费的速率完全由消费者代理控制。一旦消费者被阻塞,就会出现问题。Kafka采用拉模型(Poll),自己控制消费速度和消费进度。消费者可以根据任意一个offset进行消费。比如消费者可以消费已经消费过的消息进行再处理,或者消费最近的消息等等。网络模型KafkaClient:单线程Selector单线程模式适用于并发连接数少,逻辑简单,数据量小的情况。在Kafka中,Consumer和Producer都采用了上述的单线程模式。这种模式不适用于Kafka服务器。服务器中的请求处理过程比较复杂,会造成线程阻塞。一旦出现后续请求,将得不到处理,导致大量请求超时,造成雪崩。在服务器端,应该充分利用多线程来处理执行逻辑。KafkaServer:多线程Selector在Kafka服务器端采用多线程Selector模型。Acceptor在单独的线程中运行。对于读操作,线程池中的线程会向Selector注册Read事件,负责服务端的读操作。请求的逻辑。读取成功后,将请求放入MessageQueue共享队列。然后在write线程池中,取出request进行逻辑处理。这样即使一个请求线程阻塞了,也有后续线程从消息队列中获取请求并进行处理。在写线程中进行逻辑处理后,由于注册了OP_WIRTE事件,因此需要向其发送响应。Kafka中的高可靠分布式存储模型依赖于副本机制来保证模型的高可靠。通过复制机制,即使机器宕机也不会丢失数据。高性能日志存储Kafka一个Topic下的所有消息都以Partition的形式分布存储在多个节点上。同时,在Kafka机器上,每个Partition其实对应一个日志目录,目录下有多个日志段(LogSegment)。LogSegment文件由“.index”文件和“.log”文件两部分组成,分别表示为Segment索引文件和数据文件。这两个文件的命令规则是:Partitionglobal的第一个Segment从0开始,后面的每个Segment文件的名字都是前一个Segment文件最后一条消息的Offset值,值为64位,20numbers字符长度,如果没有数字则用零填充。如下,假设有1000条消息,每个LogSegment的大小为100,则900-1000的索引和Log如下图所示:由于Kafka消息数据太大,如果全部创建索引,会占用空间和增加耗时,所以Kafka选择了稀疏索引的方式,这样索引可以直接进入内存来加速局部查询。简单介绍一下如何读取数据。如果我们要读取第911条数据,第一步就是要找出它属于哪个段。根据二分法找到它所属的文件。找到0000900.index和00000900.log后,再去index中查找index(911-900)=11或者最近的小于11的index。这里我们通过二分法发现index为[10,1367],然后我们通过这个索引的物理位置1367,开始向后查找,直到找到911条数据。以上就是找到某个Offset的过程,但是很多时候我们并不需要找到某个Offset,我们只需要按顺序读取即可。在顺序读的时候,操作系统会在内存和磁盘之间加入PageCache,也就是我们平时看到的预读操作,所以我们的顺序读操作是非常快的。但是卡夫卡有一个问题。如果分区太多,日志段就会很多。写的时候是分批写的,实际上会变成乱写。此时随机I/O对性能影响很大。所以一般来说,Kafka的Partition不能太多。鉴于此,RocketMQ将所有日志写在一个文件中,可以顺序写入。经过一定优化后,阅读也可以接近顺序阅读。大家可以想一想:为什么需要分区,也就是说一个topic只有一个分区,不是可以吗?为什么日志需要分段?复制机制Kafka的复制机制是多个server节点复制其他节点topic分区的日志。当集群中的某个节点发生故障时,访问故障节点的请求将被转移到其他正常节点(这个过程通常称为Reblance)。Kafka中每个主题的每个分区都有一个主副本和0个或多个副本。该副本与主副本的数据保持同步,并在主副本出现故障时被替换。在Kafka中并不是所有的副本都可以用来代替主副本,所以在Kafka的Leader节点中维护了一个ISR(InSyncReplicas)集合。翻译也叫同步收集,这个收集需要满足两个条件:节点必须保持与ZK的连接。在同步过程中,该副本不能落后于主副本太远。另外,还有一个AR(AssignedReplicas)用来标识完整的副本集,OSR用来表示因为滞后而被淘汰的副本集。所以公式如下:ISR=Leader+不落后太远的replicas;AR=OSR+ISR。这里有两个名词:HW(highwatermark)是Consumer可以看到的Partition的位置,LEO是每个Partition的Log中最后一条消息的位置。HW可以保证Leader所在的Broker发生故障时,仍然可以从新选出的Leader获取消息,不会造成消息丢失。Producer向Leader发送数据时,可以通过request.required.acks参数设置数据的可靠性等级:1(默认):表示Producer在ISR中的Leader成功接收到数据后发送数据并且已经确认。一个消息。如果Leader宕机,数据将会丢失。0:表示Producer不需要等待Broker的确认就可以继续发送下一批消息。这种情况下,数据传输效率最好,但数据可靠性最好。-1:Producer需要等待ISR中所有Follower确认收到数据才算发送完成,可靠性最好。但这并不能保证数据不会丢失。比如当ISR中只有Leader时(其他节点与ZK断开,或者没有追上),这就变成了acks=1的情况。高可用模型和幂等性在分布式中一般有三种处理语义systems:at-least-once至少一次,也可能多次。如果Producer收到Ack的确认,就说明消息已经写到Kafka中了,此时正好是once,也就是我们后面的Exactly-once。但是如果Producer超时或者收到错误,request.required.acks配置不为-1,它会重试发送消息,client会认为消息还没有写入Kafka。如果broker在发送ack之前失败了,但是在消息成功写入kafka之后,这次重试会导致我们的消息被写入两次。所以消息不止一次的传递给最终的Consumer,如果Consumer的处理逻辑不保证幂等性,就会得到不正确的结果。在这个语义中,会出现乱序,即当第一个Ack失败,准备重试,但是第二个消息已经发送了,那么单个分区就会出现乱序.我们需要设置Producer参数max.in.flight.requests.per.connection,flight.requests是Producer端用来保存发送请求没有响应的队列,保证Producer端无响应的请求数是1.At-most-once如果Producer在Ack超时或者返回错误的时候没有重试,也就是我们说的request.required.acks=-1,消息可能最后没有写到Kafka,所以消费者不会收到消息。exactly-onceexactlyonce,即使Producer重试发送消息,也保证消息至多被传递给Consumer一次。这种语义是最理想的,也是最难实现的。0.10之前不能保证Exactly-once,需要使用Consumer自带的幂等性保证。0.11.0使用事务保证。如何实现exactly-once要实现exactly-once,Kafka0.11.0官方有两种策略:SingleProducerSingleTopic每个Producer在初始化时都会分配一个唯一的PID。对于每一个唯一的PID,Producer发送给指定Topic中的特定Partition发送的消息,会携带一个从0开始单调递增的SequenceNumber。在我们Broker这边,也维护了一个维度,每提交一条消息,都会alignedforverification:如果消息的sequencenumber比Broker维护的sequencenumber大1以上,说明中间有数据没有写入,也就是乱序,此时Broker拒绝消息,Producer抛出InvalidSequenceNumber。如果消息的序号小于等于Broker维护的序号,则说明该消息已经保存,属于重复消息,Broker直接丢弃该消息,Producer抛出一个DuplicateSequenceNumber。如果消息的序号恰好高一个,则证明是合法的。上面解决了两个问题:当Prouducer发送消息失败时,Broker不保存,但是第二条消息发送成功,导致数据乱序。Producer发送消息后,Broker保存成功,但返回Ack失败,Producer再次投递重复的消息。以上都是在同一个PID下,也就是说必须保证在同一个Producer中是同一个Seesion。如果Producer挂了,它会被分配一个新的PID,所以没有保证,所以Kafka机制中有事务来保证。Kafka中事务的作用是:实现exactly-once语义。保证操作的原子性,要么全部成功,要么全部失败。恢复有状态操作。事务可以保证即使跨越多个事务,在这个事务中对消费队列的操作都被认为是原子的,要么全部成功,要么全部失败。另外,有状态的应用程序还可以保证重启后从断点处继续处理,即事务恢复。在Kafka的事务中,应用程序必须提供唯一的事务ID,即TransactionID,关机重启后不会改变。TransactionID和PID之间可能存在一一对应关系。不同的是,TransactionID由用户提供,而PID是内部实现的,对用户透明。Producer重启后,旧的相同TransactionID的Producer会失效。Producer每次通过TransactionID获取PID的同时,也会获取一个单调递增的Epoch。由于老Producer的Epoch小于新Producer的Epoch,Kafka很容易识别出Producer是旧的,Producer拒绝了它的请求。为了实现这一点,Kafka0.11.0.0引入了一个名为TransactionCoordinator的服务器端模块来管理Producer发送的消息的事务性。事务协调器维护存储在内部主题中的事务日志。由于主题数据是持久的,因此事务的状态也是如此。Producer不直接读写TransactionLog,它与TransactionCoordinator通信,然后TransactionCoordinator将事务的状态插入到对应的TransactionLog中。TransactionLog的设计类似于OffsetLog,用来保存Consumer的Offset。***关于消息队列或者Kafka的一些面试常见问题,可以从上面的文章中摘录出以下经典问题,经过上面的总结大部分都可以回答:为什么要使用消息队列?消息队列的作用是什么?Kafka的topic和partitions内部是如何存储的,有什么特点?与传统的消息系统相比,Kafka的消费模型有哪些优势?Kafka是如何实现分布式数据存储和数据读取的?为什么Kafka支持的单机Partition比RocketMQ少?为什么要分区,也就是说只有一个topic?分区不行吗?为什么日志需要分段?Kafka依靠什么机制来保持高可靠和可用性?消息队列如何保证消息的幂等性?让你自己设计一个消息队列,你会怎么设计,你会考虑哪些方面?作者:赵丽简介:目前就职于美团点评生态科技部,喜欢研究和阅读开源代码。如果您有3年以上Java开发经验,请将简历发送至:lizhao07@meituan.com。