当前位置: 首页 > 科技观察

SpringBoot&Kafka轻松上手!

时间:2023-03-14 22:14:06 科技观察

Kafka的Kafka集群安装、配置、启动都需要依赖zookeeper,它集成了zookeeper本身。Zookeeper至少需要3个节点来保证集群的高可用。下面是单机linux下创建的kafka3个节点的伪集群模式。1、下载包下载地址:http://kafka.apache.org/downloads2、解压包tar-zxvfkafka_2.11-1.0.0.tgz\mvkafka_2.11-1.0.0kafka1\mvkafka_2.11-1.0.0kafka2\mvkafka_2.11-1.0.0kafka33。创建ZK集群,修改ZK配置文件:kafka1-3/config/zookeeper.properties分别修改相应参数。dataDir=/usr/local/kafka/zookeeper1dataLogDir=/usr/local/kafka/zookeeper/logclientPort=2181maxClientCnxns=0tickTime=2000initLimit=100syncLimit=5server.1=127.0.0.1:2888:3888server.2=127.0.0.1:4888:5888server.3=127.0.0.1:6888:7888/usr/local/kafka/zookeeper1-3目录下创建myid文件,内容对应1~3启动ZK,分别为Kafka1-3目录:bin/zookeeper-server-start.shconfig/zookeeper.properties&启动报告文件失败,需要手动创建文件目录并授予相应权限。4、创建Kafka集群配置文件:分别修改kafka1-3/config/server.properties中的相应参数。broker.id=1zookeeper.connect=localhost:2181,localhost:2182,localhost:2183listeners=PLAINTEXT://192.168.12.11:9091log.dirs=/tmp/kafka-logs-1启动Kafka,分别为Kafka1-3目录:bin/kafka-server-start.shconfig/server.properties&启动报告文件失败,需要手动创建文件目录并赋予相应权限。5、集群测试在kafka1上发送消息:bin/kafka-console-producer.sh--broker-listlocalhost:9091--topictest在kafka2和kafka3上消费消息:bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--from-beginning--topicmy-replicated-topicSpringBoot集成Kafka实战1.添加spring-kafka依赖2.1.0.RELEASEorg.springframework.kafkaspring-kafka${spring-kafka.version}2.添加SpringBoot的自动配置自动配置类:org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration配置属性类:org.springframework.boot.autoconfigure.kafka.KafkaPropertiesSpring:kafka:bootstrap-servers:-192.168.101.137:9091-192.168.101.137:9092-192.168.101.137:9093producer:retries:0batch-size:16384buffer-memory:33554432key-serializer:org.apache.kafka.common.serialization.StringSerializervalue-serializer:org.apache.kafka.common.serialization.StringSerializerconsumer:group-id:fooauto-offset-reset:earliestenable-auto-commit:trueauto-commit-interval:100key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializer3.发送消息@AutowiredprivateKafkaTemplatekafkaTemplate;@GetMapping("/send")publicObjectsend(Stringmsg){kafkaTemplate.send("test","name",msg);return"sendok";}4.接收消息在任意bean中,添加@KafkaListener支持消息接收@KafkaListener(topics="test")publicvoidprocessMessage(Stringcontent){logger.info("receivedmessage,topic:test,msg:{}“,内容);}