spring消费kafka消费者配置:@Configuration@EnableKafka@ConditionalOnResource(resources="/special-run.txt")publicclassZdryKafkaConsumerConfig{@Value("${kafka.zdry.consumer.autoStartup}")privateBooleanautoStartup;@Value("${kafka.zdry.consumer.servers}")私人字符串服务器;@Value("${kafka.zdry.consumer.topic}")私有字符串主题;@Value("${kafka.zdry.consumer.group.id}")privateStringgroupId;@Value("${kafka.zdry.consumer.enable.auto.commit}")privateStringenableAutoCommit;@Value("${kafka.zdry.consumer.auto.commit.interval.ms}")privateStringautoCommitIntervalMs;@Value("${kafka.zdry.consumer.session.timeout.ms}")privateStringsessionTimeoutMs;@Value("${kafka.zdry.consumer.auto.offset.reset}")privateStringautoOffsetReset;@Value("${kafka.zdry.consumer.max.poll.records}")priv吃了StringmaxPollRecords;@Value("${kafka.zdry.consumer.concurrency}")私有整数并发;/***消费者批发商员工轨迹*/@Bean("zdry_person_track")publicKafkaListenerContainerFactory>batchFactory(){ConcurrentKafkaListenerContainerFactoryfactory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(并发);factory.getContainerProperties().setPollTimeout(1500);factory.setBatchListener(true);factory.setAutoStartup(autoStartup);返厂;}/***消费者工厂*/publicConsumerFactoryconsumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}/***消费者配置*/publicMapconsumerConfigs(){MappropsMap=newHashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,服务器);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitIntervalMs);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeoutMs);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);返回propsMap;}}消费监听:@KafkaListener(topics="${kafka.zdry.consumer.topic}",containerFactory="zdry_person_track")publicvoidconsumeToJson(List>records){intcount=records.size();log.info("count={}",count);JSONArrayarr=newJSONArray();for(ConsumerRecordrecord:records){JSONObjectobj=JSON.parseObject(record.value());arr.add(obj);}}推送消费到kafka生产者kafka配置:@Configuration@EnableKafka@ConditionalOnResource(resources="/special-run.txt")publicclassZdryKafkaProducerConfig{@Value("${kafka.zdry.consumer.servers}")private字符串服务器;@Bean("zdryProducerTemplate")publicKafkaTemplate>createTemplate(){Mappros=producerProps();ProducerFactory>pf=newDefaultKafkaProducerFactory>(优点);KafkaTemplate>template=newKafkaTemplate<>(pf);返回模板;}publicMapproducerProps(){Mapprops=newHashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,服务器);props.put(ProducerConfig.RETRIES_CONFIG,0);props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);props.put(ProducerConfig.LINGER_MS_CONFIG,100);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);返回道具;}}生产消费并推送到kafka@AutowiredprivateKafkaTemplatezdryProducerTemplate;@Value("${kafka.zdry.consumer.push.topic}")privateStringpushTopic;方法:try{zdryProducerTemplate.send(pushTopic,JSON.toJSONString(warn));}catch(Exceptione){e.printStackTrace();}zdryProducerTemplate.flush();ymlConfigurekafka:##Keypersonalcapturetrackzdry:consumer:autoStartup:trueservers:x.x.x.x:6667topic:person_trackgroup.id:aa_1enable.auto.commit:trueauto.commit.interval.ms:100session.timeout.ms:10000auto.offset.reset:earliestmax.poll.records:100concurrency:1push.topic:ai_warn