当前位置: 首页 > 后端技术 > Java

Java实现Kafka生产者和消费者实例

时间:2023-04-01 20:15:06 Java

Kafka简介Kafka是Apache软件基金会开发的开源流处理平台,用Scala和Java编写。Kafka的目标是提供一个统一的、高吞吐量、低延迟的实时数据处理平台。方法一:kafka-clients导入依赖在pom.xml文件中,导入kafka-clients依赖:org.apache.kafkakafka-clients2.3.1生产者创建一个KafkaProducer生产者实例:@ConfigurationpublicclassConfig{publicfinalstaticStringbootstrapServers="127.0.0.1:9092";@Bean(destroyMethod="close")publicKafkaProducerkafkaProducer(){Propertiesprops=newProperties();//设置Kafka服务器地址props.put("bootstrap.servers",bootstrapServers);//设置数据key的序列化处理类props.put("key.serializer",StringSerializer.class.getName());//设置数据值的序列化处理类props.put("value.serializer",StringSerializer.class.getName());KafkaProducerproducer=newKafkaProducer<>(props);返回生产者;}}在控制器中使用:@RestController@Slf4jpublicclassController{@AutowiredprivateKafkaProducerkafkaProducer;@RequestMapping("/kafkaClientsSend")publicStringsend(){Stringuuid=UUID.randomUUID().toString();RecordMetadatarecordMetadata=null;try{//向Kafka服务器的名为“one-more-topic”的Topic发送消息recordMetadata=kafkaProducer.send(newProducerRecord<>("one-more-topic",uuid)).get();log.info("recordMetadata:{}",recordMetadata);log.info("uuid:{}",uuid);}catch(Exceptione){log.error("发送失败,uuid:{}",uuid,e);}返回uuid;}}消费者创建KafkaConsumer的消费者实例:@ConfigurationpublicclassConfig{publicfinalstaticStringgroupId="kafka-clients-group";publicfinalstaticStringbootstrapServers="127.0.0.1:9092";@Bean(destroyMethod="关闭")publicKafkaConsumerkafkaConsumer(){Propertiesprops=newProperties();//设置Kafka服务器地址props.put("bootstrap.servers",bootstrapServers);//设置消费组props.put("group.id",groupId);//设置数据key的反序列化处理类props.put("key.deserializer",StringDeserializer.class.getName());//设置数据值的反序列化Deserializationclassprops.put("value.deserializer",StringDeserializer.class.getName());props.put("enable.auto.commit","true");props.put("auto.commit.interval.ms","1000");props.put("session.timeout.ms","30000");KafkaConsumerkafkaConsumer=newKafkaConsumer<>(props);//订阅名称为"one-more-topic"TopicmessagekafkaConsumer.subscribe(Arrays.asList("one-more-topic"));返回卡夫卡消费者;}}在C中在控制器中使用:@RestController@Slf4jpublicclassController{@AutowiredprivateKafkaConsumerkafkaConsumer;@RequestMapping("/receive")publicListreceive(){////来自Kafka服务器的名称ConsumerRecordsrecords=kafkaConsumer.poll(Duration.ofSeconds(1));Listmessages=newArrayList<>(records.count());for(ConsumerRecordrecord:records.records("one-more-topic")){Stringmessage=record.value();log.info("消息:{}",消息);消息。添加(消息);}返回消息;}}方法二:spring-kafka使用kafka-clients,我们需要自己创建producer或者consumerbean,如果我们的项目是基于SpringBoot的,那么使用spring-kafka就方便多了引入依赖在pom.xml文件中,引入spring-kafka的依赖:org.springframework.kafkaspring-kafka2.3.12.RELEASEproducer在application.yml文件中添加配置:spring:kafka:#Kafka服务器地址bootstrap-servers:127.0.0.1:9092producer:#设置数据值的序列化处理类value-serializer:org.apache.kafka.common.serialization.StringSerializer可以直接在Controller中注入KafkaTemplate使用。代码如下:@RestController@Slf4jpublicclassController{@AutowiredprivateKafkaTemplatetemplate;@RequestMapping("/springKafkaSend")publicStringsend(){Stringuuid=UUID.randomUUID().toString();//向Kafka服务器的名为“one-more-topic”的Topic发送消息this.template.send("one-more-topic",uuid);log.info("uuid:{}",uuid);返回uuid;}}消费者在application.yml文件中添加配置:spring:kafka:#Kafka服务器地址bootstrap-servers:127.0.0.1:9092consumer:#设置数据值的反序列化处理类value-deserializer:org.apache.kafka.common.serialization.StringDeserializer创建一个可以被Spring框架扫描的类,并在方法上添加@KafkaListener注解,即可消费消息,代码如下:@Component@Slf4jpublicclassReceiver{@KafkaListener(topics="one-more-topic",groupId="spring-kafka-group")publicvoidlisten(ConsumerRecordrecord){OptionalkafkaMessage=Optional.ofNullable(record.value());如果(kafkaMessage.isPresent()){字符串消息=(字符串)kafkaMessage.get();log.info("消息:{}",消息);}}}