1.知道什么是Kafka吗?卡夫卡有什么用?官方定义如下:Kafka用于构建实时数据管道和流式应用程序。它具有水平可扩展性、容错性、速度极快,并在数千家公司的生产环境中运行。翻译一下,大致意思就是这是一个可以横向扩展,高可靠的实时数据处理系统!实时数据处理,顾名思义,很容易理解,就是对数据进行实时处理。在当前流行的微服务开发中,最常用的实时数据处理平台包括RabbitMQ、RocketMQ等消息中间件。这些中间件主要有两个特点:服务解耦流量调峰在早期的web应用开发中,当请求量突然上来的时候,我们会将需要处理的数据推送到一个队列通道,然后再启动一个线程不断拉取轮流队列中的数据,从而加快程序的运行效率。但是随着请求量的不断增加,队列通道中的数据一直处于高负载状态,在这种情况下,应用的内存占用会非常高。一不小心就会出现内存不足,导致程序内存溢出,导致服务不可用。随着业务量的不断扩大,在应用中使用这种模式已经不能满足需求。因此,各种消息中间件应运而生,如ActiveMQ、RabbitMQ、RocketMQ等中间件。采用这种模型,本质是将要推送的数据不存储在当前应用的内存中,而是存储在另一个负责数据处理的应用中,从而实现服务解耦。消息中间件:主要职责是保证消息能够被接收并存储在磁盘上。即使其他服务宕机,数据也不会丢失。同时,它还可以监控数据消耗。应用:只需要将消息推送到消息中间件,然后开启一个线程不断的从消息中间件中拉取数据进行消费确认即可!引入消息中间件后,整个服务开发将变得更加容易,各司其职。Kafka的本质其实就是一种消息中间件。Kafka来自LinkedIn,2010年开源到github。LinkedIn的开发团队,为了解决数据管道问题,最初采用ActiveMQ进行数据交换,大约在2010年左右,当时ActiveMQ远远不能满足LinkedIn对数据传递系统的要求,经常会因为各种缺陷导致消息阻塞或者服务中断无法正常访问。为了解决这个问题,LinkedIn决定自己开发消息系统,Kafka就诞生了。在LinkedIn,Kafka每天可以有效处理数十亿条消息的指标和用户活动跟踪。其强大的处理能力得到了业界的认可,成为大数据流水线的首选技术。2.架构介绍先来看一张图。下图是kafka生产消费的核心架构模型!这些概念不懂没关系,小编带大家一起来梳理一下!Producer:Producer是生产者,消息的生产者是消息的入口Broker:Broker是kafka的一个实例,每个server都有一个或多个kafka实例。简单理解就是kafka服务器,kafkacluster就是集群的意思。Topic:消息的主题,可以理解为一个消息队列,Kafka数据存储在主题中。可以在每个代理上创建多个主题。分区:主题分区。每个主题可以有多个分区。分区的作用是加载和提高Kafka的吞吐量。同一个主题在不同分区的数据不重复,分区的表示是一个一个文件夹!复制:每个分区都有多个副本,副本的作用是做一个备份轮胎。同步数据到slave分区(Follower)。当主分区(Leader)出现故障时,会选出一个备用轮胎(Follower)成为Leader。Kafka默认最大副本数为10,副本数不能大于Brokers的数量。追随者和领导者肯定在不同的机器上。同一台机器只能存储同一个分区的一份副本。消息:每条发送的消息体。Consumer:消费者,即消息的消费者,是消息的出口。ConsumerGroup:我们可以将多个消费者组组成一个消费者组。在Kafka的设计中,同一个partition的数据只能被consumergroup中的某个consumer消费。同一个消费者组中的消费者可以消费同一个主题的不同分区的数据,这也是为了提高Kafka的吞吐量!Zookeeper:Kafka集群依赖zookeeper保存集群的元信息,保证系统的可用性。简而言之,Kafka本质上是一个消息系统。与大多数消息系统一样,它的主要特点如下:采用推拉模型分离生产者和消费者,为消息系统中的消息数据提供持久化,让多个消费者提供高可用的集群服务,主从模式,以及支持横向横向扩展。与ActiveMQ、RabbitMQ、RocketMQ不同的是它有一个**Partition**的概念。这个partition的意思是如果你创建一个topic有5个partition,当你一次性push1000条数据给Kafka时,这1000条数据会默认分配到5个partition,每个partition存储200条数据。这样做的目的是为了方便消费者从不同的分区中拉取数据。如果同时启动5个线程拉取数据,每个线程拉取一个partition,消费速度会非常非常快!这是kafka和其他消息系统最大的区别!2.1.发送数据和其他中间件一样。kafka每次发送数据时,都会将数据发送到Leader分区,并顺序写入磁盘。然后Leader分区会将数据同步给各个从分区Follower,即使主分区宕机,也不会影响服务的正常运行。kafka是如何将数据写入对应分区的?Kafka有以下原则:1.写入数据时,可以指定要写入的分区。如果指定,写入相应的分区。2.如果没有指定partition,但是设置了datakey,则根据key的值对一个partition进行hash。3.如果没有指定partition,也没有设置key,则轮询选择一个partition。2.2消费者数据和生产者类似的,当消费者主动去Kafka集群拉取消息时,也会从Leader分区拉取数据。这里需要重点关注一个名词:消费者群体!考虑到多消费者的场景,在设计Kafka的时候,多个消费者可以组成一个消费者组,同一个消费者组的消费者可以消费同一个topic,对于不同partition的数据,同一个partition只会被某个consumer消费在一个消费组中防止重复消费的问题!但是不同的组可以消费同一个分区的数据!你可以这样理解,一个消费者组就是一个客户端,一个客户端可以由很多消费者组成,以加快消息的消费。但是,如果一个组中的消费者数量大于分区数量,就会有很多消费者闲置。如果partition的数量大于一个group下consumer的数量,一个consumer会负责多个partition的消费,消费性能会不平衡。所以在实际应用中,建议消费者组中的消费者数量与分区数量保持一致!3.kafka安装理论没用。下面以centos7为例介绍kafka的安装和使用。Kafka需要zookeeper来保存服务实例的元信息,所以在安装kafka之前,我们需要先安装zookeeper。3.1.安装zookeeperzookeeper安装环境依赖jdk,所以需要提前安装jdk#安装jdk1.8yum-yinstalljava-1.8.0-openjdk下载zookeeper并解压文件包#下载zookeeperwgethttp://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz#解压tar-zxvfzookeeper-3.4.12.tar.gz创建数据和日志目录#创建数据和日志存放目录cd/usr/zookeeper/mkdirdatamkdirlog#备份一份conf下的zoo_sample.cfg,然后重命名为zoo.cfgcdconf/cpzoo_sample.cfgzoo.cfg配置zookeeper#编辑zoo.cfg文件vimzoo.cfg重新配置dataDir和dataLogDir的存放路径最后启动Zookeeper服务#进入Zookeeper的bin目录cdzookeeper/zookeeper-3.4.12/bin#启动Zookeeper./zkServer.shstart#查询Zookeeper状态./zkServer.shstatus#关闭Zookeeper状态./zkServer.shstop3.2,安装kafka到官网http://kafka.apache.org/downloads.html下载你想要的版本。我在这里下载了最新的稳定版2.8.0。#下载kafka安装包wgethttps://apache.osuosl.org/kafka/2.8.0/kafka-2.8.0-src.tgz#解压文件包tar-xvfkafka-2.8.0-src.tgz修改配置文件根据需要server.properties(optional)#进入配置文件夹cdkafka-2.8.0-src/config#编辑server.propertiesvimserver.propertiesserver.properties文件内容如下:broker.id=0listeners=PLAINTEXT://localhost:9092num。网络。threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0有四个重要参数:broker.id:uniqueIDlisteners=PLAINTEXT://localhost:9092:kafka服务监听地址和端口log.dirs:日志存放目录zookeeper.connect:指定zookeeper服务地址,根据需要修改相应配置!3.3、启动kafka服务#回车bin脚本目录cdkafka-2.8.0-src/binstartkafkaservicenohupkafka-server-start.sh../config/server.propertiesserver.log2>server.err&3.4,创建topictopics创建一个名为testTopic的topic,里面包含只有一个分区,只有一个副本:#进入bin脚本目录cdkafka-2.8.0-src/bin#Createtopicskafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictestTopic运行listtopic命令,可以看到topic#进入bin脚本目录cdkafka-2.8.0-src/bin#查询当前kafka上的所有topickafka-topics.sh--list--zookeeperlocalhost:2181Output:testTopic3.5.发送消息Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入并将其作为消息发送到Kafka集群。默认情况下,每一行将作为单独的消息发送。运行生产者,然后在控制台中键入一些消息以发送到服务器。#进入bin脚本目录cdkafka-2.8.0-src/bin#运行一个producer,向testTopic主题发送消息kafka-console-producer.sh--broker-listlocalhost:9092--topictestTopic输入两个内容回车:你好卡夫卡!这是消息3.5,接受消息Kafka也有一个命令行消费者,可以将消息转储到标准输出。#进入bin脚本目录cdkafka-2.8.0-src/bin#运行一个consumer,从testTopic主题中拉取消息kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topictestTopic--from-beginningoutput结果如下:Hellokafka!Thisisamessage4.总结本文主要围绕kafka的架构模型和安装环境做一些初步的介绍。难免会有误会。欢迎广大网友批评、投诉。由于篇幅原因,我将在下一篇详细介绍java环境下的kafka应用场景!
