当前位置: 首页 > 科技观察

springboot集成KafkaStream实时统计数据

时间:2023-03-12 01:47:44 科技观察

环境:springboot2.3.12.RELEASE+kafka_2.13-2.7.0+zookeeper-3.6.2KafkaStream介绍Kafka在0.10版本推出了StreamAPI,提供对数据的访问存储在Kafka中的流式传输和分析数据的能力。流计算一般用来和批计算进行比较。批计算往往以固定的数据集作为输入,计算出结果。流式计算的输入往往是“无界”(UnboundedData),连续输入,即永远得不到全量数据进行计算;同时,计算结果也在不断输出,只能得到某一时刻的结果,而不是最终的结果。KafkaStreams是一个客户端库,用于处理和分析存储在Kafka中的数据。它建立在流处理的一些重要概念之上:如何区分事件时间和处理时间、Windowing支持、简单高效的管理和实时查询应用程序状态。KafkaStreams的门槛很低:和写一个普通的Kafka消息处理程序没有太大区别。可以采用多进程部署来完成扩容、负载均衡、高可用(KafkaConsumer的并行模型)。KafkaStreams的一些特性:设计为一个简单、轻量级的客户端库,可以集成到任何Java应用程序中,除了Kafka之外不需要任何额外的依赖,利用Kafka的分区模型支持水平扩展并保证Sequence实现高效的状态操作(windowedjoin和aggregations)通过容错状态存储支持exactly-once语义支持记录级处理,实现毫秒级延迟提供High-LevelStreamDSL和Low-LevelProcessorAPIStreamProcessingTopology流处理拓扑结构stream是KafkaStreams提供的最重要的抽象:它代表一个无限的、不断更新的数据集。流是不可变数据记录的有序、可重放和容错序列,其中数据记录定义为键值对。流处理应用程序是使用KafkaStreams库的应用程序。它通过处理器拓扑结构定义计算逻辑,其中每个处理器拓扑结构是由多个流处理器(节点)通过流组成的图。流处理器是处理器拓扑中的一个节点;它表示一个处理步骤,通过从拓扑中的上游处理器一次接收一个输入记录,将其操作应用于该记录,然后可以向其下游处理器生成一个或多个输出记录,从而转换流中的数据。有两种特殊的处理器:源处理器源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个Kafka主题的记录并将它们转发到其下游处理器,从一个或多个Kafka主题生成其拓扑的输入流。汇处理器汇处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka主题。相关的核心概念查看如下链接下面显示KafkaStream在Springboot中的应用依赖org.springframework.bootspring-boot-starter-weborg.springframework.kafkaspring-kafkaorg.apache.kafkakafka-流配置server:port:9090spring:application:name:kafka-demokafka:streams:application-id:${spring.application.name}properties:spring.json.trusted.packages:'*'引导程序服务器:-localhost:9092-localhost:9093-localhost:9094producer:acks:1retries:10key-serializer:org.apache.kafka.common.serialization.StringSerializervalue-serializer:org.springframework.kafka.support.serializer.JsonSerializer#org.apache.kafka.common.serialization.StringSerializerproperties:spring.json.trusted.packages:'*'consumer:key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.springframework.kafka.support.serializer.JsonDeserializer#org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit:falsegroup-id:ConsumerTestauto-offset-reset:latestproperties:session.timeout.ms:12000heartbeat.interval.ms:3000max.poll.records:100spring.json.trusted.packages:'*'listener:ack-mode:manual-immediatetype:batchconcurrency:8properties:max.poll.interval.ms:300000消息发送@ServicepublicclassMessageSend{@ResourceprivateKafkaTemplatekafkaTemplate;publicvoidsendMessage2(Messagemessage){kafkaTemplate.send(newProducerRecord("test",message)).addCallback(result->{System.out.println("执行成功..."+Thread.currentThread().getName());},ex->{System.out.println("执行失败");ex.printStackTrace();});}}消息监听@KafkaListener(topics={"test"})publicvoidlistener2(List>records,Acknowledgmentack){for(ConsumerRecordrecord:records){System.out.println(this.getClass().hashCode()+",Thread"+Thread.currentThread().getName()+",key:"+record.key()+",接收到消息:"+record.value()+",patition:"+record.partition()+",offset:"+record.offset());}try{TimeUnit.SECONDS.sleep(0);}catch(InterruptedExceptione){e.printStackTrace();}ack.acknowledge();}@KafkaListener(topics={"demo"})publicvoidlistenerDemo(List>records,Acknowledgmentack){for(ConsumerRecordrecord:records){System.out.println("DemoTopic:"+this.getClass().hashCode()+",Thread"+Thread.currentThread().getName()+",key:"+record.key()+",接收到消息:"+record.value()+",patition:"+record.partition()+",offset:"+record.offset());}ack.acknowledge();}KafkaStstream处理消息转换并转发其他Topics@BeanpublicKStreamkStream(StreamsBuildersstreamsBuilder){KStreamstream=streamsBuilder.stream("test");stream.map((key,value)->{System.out.println("原始消息内容:"+newString((byte[])value,Charset.forName("UTF-8")));returnnewKeyValue<>(key,"{\"title\":\"123123\",\"message\":\"Redefiningcontent\"}".getBytes(Charset.forName("UTF-8")));}).to("demo");returnsstream;}执行结果:流对象处理@BeanpublicKStreamkStream4(StreamsBuilderstreamsBuilder){JsonSerdejsonSerde=newJsonSerde<>();JsonDeserializerdescri=(JsonDeserializer)jsonSerde.deserializer();descri.addTrustedPackages("*");KStreamstream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));stream.map((key,value)->{value.setTitle("XXXXXXX");returnnewKeyValue<>(key,value);}).to("demo",Produced.with(Serdes.String(),jsonSerde));returnstream;}执行结果:分组处理@BeanpublicKStreamkStream5(StreamsBuilderstreamsBuilder){JsonSerdejsonSerde=newJsonSerde<>();JsonDeserializerdescri=(JsonDeserializer)jsonSerde.deserializer();descri.addTrustedPackages("*");KStreamstream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));stream.selectKey(newKeyValueMapper(){@OverridepublicStringapply(Stringkey,Messagevalue){返回值.getOrgCode();}}).groupByKey(Grouped.with(Serdes.String(),jsonSerde)).count().toStream().print(Printed.toSysOut());returnstream;}执行结果:聚合@BeanpublicKStreamkStream6(StreamsBuildersstreamsBuilder){JsonSerdejsonSerde=newJsonSerde<>();JsonDeserializerdescri=(JsonDeserializer)jsonSerde.deserializer();descri.addTrustedPackages("*");KStreamstream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));stream.selectKey(newKeyValueMapper(){@OverridepublicStringapply(Stringkey,Messagevalue){returnvalue.getOrgCode();}}).groupByKey(Grouped.with(Serdes.String(),jsonSerde)).aggregate(()->0L,(key,value,aggValue)->{System.out.println("key="+key+",value="+value+",agg="+aggValue);returnaggValue+1;},Materialized.>as("kvs").withValueSerde(Serdes.Long())).toStream().print(Printed.toSysOut()));returnstream;}执行结果:过滤过滤数据@BeanpublicKStreamkStream7(StreamsBuilderstreamsBuilder){JsonSerdejsonSerde=newJsonSerde<>();JsonDeserializerdescri=(JsonDeserializer)jsonSerde.deserializer();descri.addTrustedPackages("*");KStreamstream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));stream.selectKey(newKeyValueMapper(){@OverridepublicStringapply(Stringkey,Messagevalue){returnvalue.getOrgCode();}}).groupByKey(Grouped.with(Serdes.String(),jsonSerde)).聚合(()->0L,(key,value,aggValue)->{System.out.println("key="+key+",value="+value+",agg="+aggValue);returnaggValue+1;},Materialized.>as("kvs").withValueSerde(Serdes.Long())).toStream().filter((key,value)->!"2".equals(key)).print(Printed.toSysOut());returnsstream;}执行结果:过滤Key不等于"2"分支多流处理@BeanpublicKStreamkStream8(StreamsBuilderstreamsBuilder){JsonSerdejsonSerde=newJsonSerde<>();JsonDeserializerdescri=(JsonDeserializer)jsonSerde.deserializer();descri.addTrustedPackages("*");KStreamstream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));//分支,多流处理KStream[]arrStream=stream.branch((key,value)->"Male".equals(value.getSex()),(key,value)->"Female".equals(value.getSex()));Stream.of(arrStream).forEach(as->{as.foreach((key,message)->{System.out.println(Thread.currentThread().getName()+",key="+key+",message="+message);});});returnsstream;}执行结果:多字段分组不能使用多个selectKey,后者那些将被覆盖之前的@BeanpublicKStreamkStreamM2(StreamsBuilderstreamsBuilder){JsonSerdejsonSerde=newJsonSerde<>();JsonDeserializerdescri=(JsonDeserializer)jsonSerde.deserializer();descri。addTrustedPackages("*");KStreamstream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));stream.selectKey(newKeyValueMapper(){@OverridepublicStringapply(Stringkey,Messagevalue){System.out.println(Thread.currentThread().getName());returnvalue.getTime()+"|"??+value.getOrgCode();}}).groupByKey(Grouped.with(Serdes.String(),jsonSerde)).count().toStream().print(Printed.toSysOut());returnsstream;}执行结果: