简介https://img.ydisp.cn/news/20220914/za4i2evzbx2Kafka是一个高吞吐量的分布式发布订阅消息系统。Kafka是linkedin用于日志处理的分布式消息队列。LinkedIn的日志数据容量很大。但可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享等)和系统运行日志(CPU、内存、磁盘、网络、系统和进程状态)。目前有很多消息队列服务提供可靠的传递保证,默认为即时消费(不适合离线)。linkedin的日志不需要高可靠投递,所以可以通过降低可靠性来提升性能。同时,通过构建分布式集群,让消息在系统中积累,Kafka同时支持离线和在线日志处理测试环境kafka_2.10-0.8.1.1由3个节点组成的集群zookeeper-3.4.5一个实例节点代码示例消息生成器代码示例importjava.util.Collections;导入java.util.Date;导入java.util.Properties;importjava.util.随机;importkafka.javaapi.producer.Producer;importkafka.producer.KeyedMessage;importkafka.producer.ProducerConfig;/***详情请参考:https://img.ydisp.cn/news/20220914/0uje1y0t2e3.0+Producer+Example*@authorFung**/publicclassProducerDemo{publicstaticvoidmain(String[]args){Randomrnd=新随机数();意图=100;//设置配置属性Propertiesprops=newProperties();props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");props.put("sserializer.class","kafka.serializer.StringEncoder");//key.serializer.class默认为serializer.classprops.put("key.serializer.class","kafka.serializer.StringEncoder");//可以可选配置,如果不配置,使用默认partitionerprops.put("partitioner.class","com.catt.kafka.demo.PartitionerDemo");//触发确认机制,否则fireandforget可能导致数据丢失//取值为0、1、-1,可以参考//https://img.ydisp.cn/news/20220914/za4i2evzbx2/08/configuration.htmlprops.put("request.required.acks","1");ProducerConfigconfig=newProducerConfig(props);//创建生产者Producerproducer=newProducer(config);//生成并发送消息longstart=System.currentTimeMillis();for(longi=0;i;data=newKeyedMessage("page_visits",ip,msg);生产者.发送(数据);}System.out.println("时钟:"+(System.currentTimeMillis()-start));//关闭producerproducer.close();}}消息消费者代码显示示例importjava.util.HashMap;导入java.util.List;导入java.util.Map;导入java.util.Properties;importjava.util.concurrent.ExecutorService;导入java.util.concurrent.Executors;importkafka.consumer.Consumer;importkafka.consumer.ConsumerConfig;importkafka.consumer.KafkaStream;importkafka.javaapi.consumer.ConsumerConnector;/***详细内容可以参考:https://img.ydisp.cn/news/20220914/4byufbuvzc3**@authorFung**/publicclassConsumerDemo{privatefinalConsumerConnectorconsumer;私人最终字符串主题;私有执行器服务执行器;publicConsumerDemo(Stringa_zookeeper,Stringa_groupId,Stringa_topic){consumer=Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));this.topic=a_topic;}publicvoidshutdown(){如果(消费呃!=null)consumer.shutdown();如果(执行器!=空)执行器。关闭();}publicvoidrun(intnumThreads){MaptopicCountMap=newHashMap();topicCountMap.put(topic,newInteger(numThreads));Map>>consumerMap=consumer.createMessageStreams(topicCountMap);List>streams=consumerMap.get(topic);//nowlaunchallthethreadsexecutor=Executors.newFixedThreadPool(numThreads);//现在创建一个对象来消费消息//intthreadNumber=0;for(finalKafkaStreamstream:streams){executor.submit(newConsumerMsgTask(stream,threadNumber));线程数++;}}privatestaticConsumerConfigcreateConsumerConfig(Stringa_zookeeper,Stringa_groupId){Propertiesprops=newProperties();props.put("zookeeper.connect",a_zookeeper);props.put("group.id",a_groupId);props.put("zookeeper.session.timeout.ms","400"吨;);props.put("zookeeper.sync.time.ms","200");props.put("auto.commit.interval.ms","1000");returnnewConsumerConfig(props);}publicstaticvoidmain(String[]arg){String[]args={"172.168.63.221:2188","group-1","page_visits","12"};StringzooKeeper=args[0];StringgroupId=args[1];Stringtopic=args[2];intthreads=Integer.parseInt(args[3]);ConsumerDemodemo=newConsumerDemo(zooKeeper,groupId,topic);demo.run(线程);尝试{Thread.sleep(10000);}catch(InterruptedExceptionie){}demo.shutdown();}}消息处理类importkafka.consumer.ConsumerIterator;importkafka.consumer.KafkaStream;publicclassConsumerMsgTaskimplementsRunnable{privateKafkaStream_stream;privateintm_threadNumber;publicConsumerMsgTask(KafkaStreamstream,intthreadNumber){m_threadNumber=threadNumber;m_stream=流;}publicvoidrun(){ConsumerIteratorit=m_stream.iterator();同时(它.hasNext())System.out.println("线程"+m_threadNumber+":"+newString(it.next().message()));System.out.println("关闭线程:"+m_threadNumber);}}Partitioner类示例importkafka.producer.Partitioner;importkafka.utils.VerifiableProperties;publicclassPartitionerDemoimplementsPartitioner{publicPartitionerDemo(VerifiablePropertiesprops){}@Overridepublicintpartition(Objectobj,intnumPartitions){intpartition=0;if(objinstanceofString){Stringkey=(String)obj;intoffset=key.lastIndexOf('.');if(offset>0){partition=Integer.parseInt(key.substring(offset+1))%numPartitions;}}else{partition=obj.toString().length()%numPartitions;}返回分区;}}参考https://cwiki.apache.org/confluence/display/KAFKA/Indexhttps://kafka.apache.org/原文档链接:https://img.ydisp.cn/news/20220914/chedwwhcl55