当前位置: 首页 > 后端技术 > Java

Kafka学习-第一步安装环境和我们的第一个相关代码

时间:2023-04-02 00:01:00 Java

windows上的kafka学习kafka学习安装相关问题一、下载kafkakafka是java实现的,官方提供windows支持,所以下载解压到一个文件夹即可,比如我在D:\kafka,这里的版本是3.20,和其他版本可能有点不同。整体路径如下,分别是bin:已经写好的一些shell命令文件和windowsconfig下的bat文件:已经配置的一些文件,比如kafka服务器配置,zookeeper配置,consumer和producer配置libs:jarpackage和一些依赖license:开源协议证书2.启动kafka单实例激动人心的时刻到了,我们下载了文件并安装了jdk环境(一般都会有Environment),然后设置properties文件,这里我只是粘贴需要注意的properties文件,定义后面需要用到的端口:#file:config/server.propertieslisteners=PLAINTEXT://127.0.0.1:9092#指定端口log.dirs=E:\\kafka-logs-1#我认为最好指定一个文件夹zookeeper.connect=localhost:2181#指定zookeeper服务器#file:config/zookeeper.propertiesdataDir=E:\\zookeeper#客户端连接的端口clientPort=21811。启动zookeeper.\bin\windows\zookeeper-server-start.bat.\config\zookeeper.properties#如果你使用下面的wsl或bash.\bin\zookeeper-server-start.sh。\config\zookeeper.properties2。启动kafka.\bin\windows\kafka-server-start.bat.\config\server.properties#下面如果使用wsl或者bash.\bin\kafka-server-start.sh.\config\zookeeperer.properties是这样的,就算启动成功了,也能看到连接到启动实例的zookeeper和broker提供的ip。3、创建topic#老版本使用zookeeper-server确定对应的kafka集群,而新版本使用bootstrap-server确定连接的集群。\bin\windows\kafka-topics--create--bootstrap-server127.0.0.1:9092--topictest我们可以使用如下命令查看当前集群的topic情况:.\bin\windows\kafka-topics--describe--bootstrap-server127.0.0.1:9092可以看到其实有个top叫__consumer_=offsets,用来保存对应consumer的offset数据4.往topic写入数据,读取Data.\bin\windows\kafka-console-producer--bootstrap-server127.0.0.1:9092--topictest1输入日志数据,然后读出。\bin\windows\kafka-console-consumer.bat--topictest1--bootstrap-server127.0.0.1:9093#--from-beginning可以看到仍然是消息保存到此我们就完成了最基本的kafka操作,创建主题\写入数据\读取数据三项。自己写代码1.自己写producer根据kafka自己的教程,kafka-clients本身提供了阻塞和非阻塞三种发送方式以及实现的futurecallback。包com.lixiande.kafkaLearn;导入org.apache.kafka.clients.producer.Callback;导入org.apache.kafka.clients.producer.KafkaProducer;导入org.apache.kafka.clients.producer.ProducerRecord;导入org.apache.kafka.clients.producer.RecordMetadata;导入org.slf4j.Logger;导入org.slf4j.LoggerFactory;导入java.util.Date;导入java.util.Properties;导入java.util.concurrent.Future;公共类Producer{Logger记录器=LoggerFactory.getLogger(Producer.class);私有属性kafkaProducerProps;私有KafkaProducerkafkaProducer;publicstaticvoidmain(String[]args){生产者producer=newProducer();try{while(true){producer.SendCallBack("test","keyvalue搞什么鬼","这跟kafka没什么关系,用Callback发送"+newDate().toString());producer.SendBlock("test","keyvalue搞什么鬼","这跟kafka没什么关系,通过阻塞发送"+newDate().toString());producer.SendAsync("test","keyvalue搞什么鬼","这与kafka和发送无关byasync"+newDate().toString());}}finally{producer.kafkaProducer.close();}}publicvoidSendBlock(Stringtopic,Stringkey,Stringvalue){try{System.out.println(“块发送:”+kafkaProducer.send(newProducerRecord(topic,key,value)).get().toString());}catch(Exceptione){e.printStackTrace();}}publicFutureSendAsync(Stringtopic,Stringkey,Stringvalue){returnkafkaProducer.send(newProducerRecord(topic,key,value));}publicvoidSendCallBack(Stringtopic,Stringkey,Stringvalue){kafkaProducer.send(newProducerRecord(topic,key,value),newCallback(){@OverridepublicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione){System.out.println(recordMetadata.toString());if(e!=null)System.out.println(e.toString());}});}publicProducer(){kafkaProducerProps=newProperties();kafkaProducerProps.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());kafkaProducerProps.put("value.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());kafkaProducerProps.put("bootstrap.servers","127.0.0.1:9092");kafkaProducer=newKafkaProducer(kafkaProducerProps);}}2。写自己的consumer同一个consumer也可以有多种方式,比如订阅topic,订阅topic中的一些分区,订阅正则匹配topicspackagecom.lixiande.kafkaLearn;importorg.apache.kafka.clients.consumer.ConsumerRecord;导入org.apache.kafka.clients.consumer.ConsumerRecords;导入org.apache。kafka.clients.consumer.KafkaConsumer;导入org.apache.kafka.common.PartitionInfo;导入org.apache.kafka.common.errors.WakeupException;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.*;publicclassConsumer{privatestaticLoggerlogger=LoggerFactory.getLogger(Consumer.class);私人卡夫卡消费者卡夫卡消费者;私有属性kafkaConsumerProps;privateMapconsumeMap;publicstaticvoidmain(String[]args){消费者consumer=newConsumer();线程mainThread=Thread.currentThread();Runtime.getRuntime().addShutdownHook(newThread(()->{System.out.println("consumerstartingexiting");consumer.kafkaConsumer.wakeup();try{mainThread.join();}catch(InterruptedExceptione){e.printStackTrace();}}));消费者.listen();}publicConsumer(){kafkaConsumerProps=newProperties();kafkaConsumerProps.put("靴子trap.servers","127.0.0.1:9092");kafkaConsumerProps.put("key.deserializer",org.apache.kafka.common.serialization.StringDeserializer.class.getName());kafkaConsumerProps.put("值。解串器”,org.apache.kafka.common.serialization.StringDeserializer.class.getName());kafkaConsumerProps.put(“group.id”,“loopConsumer”);kafkaConsumer=newKafkaConsumer(kafkaConsumerProps);kafkaConsumer.subscribe(Collections.singletonList("test"));//kafkaConsumer.subscribe(Pattern.compile("test*"));//你也可以订阅所有的test*主题ListpartitionInfoList=kafkaConsumer.partitionsFor("test");//XXX:这里可以用来获取特定topic的partition,从而实现不同consumer的手动分配,不平衡/*//if(partitionInfoList!=null){//for(PartitionInfoinfo:partitionInfoList){//partitions.add(newTopicPartition(info.topic(),info.partition()));//}//kafkaConsumer.assign(分区);//}*/consumeMap=newHashMap<>();}publicvoidlisten(){try{while(true){ConsumerRecordsrecords=kafkaConsumer.poll(100);for(ConsumerRecordrecord:records){logger.warn(record.toString());int更新计数=1;如果(consumeMap.containsValue(record.value())){updatedCount=consumeMap.get(record.value())+1;}consumeMap.put(record.value(),updatedCount);}System.out.println("\n--------------------------------------------------\n");System.out.println(consumeMap);System.out.println("\n----------------------------------------------\n");consumeMap.clear();kafkaConsumer.commitAsync();}}catch(WakeupExceptione){}finally{kafkaConsumer.close();System.out.println("关闭消费者,我们完成了");}}}