深入应用4.1springboot-kafka1)配置文件kafka:bootstrap-servers:52.82.98.209:10903,52.82.98.209:10904producer:#producerproducerretries:0#acks重试次数:1#Acknowledgmentlevel:备份多少份partitions向producer发送ack确认(可选0,1,all/-1)batch-size:16384#一次发送的最大数据量buffer-memory:33554432#生产者缓冲区大小key-serializer:org.apache.kafka.common.serialization.StringSerializervalue-serializer:org.apache.kafka.common.serialization.StringSerializerconsumer:#consumerconsumergroup-id:javagroup#defaultconsumergroupIDenable-auto-commit:true#是否自动提交offsetauto-commit-interval:100#提交offset延时(收到消息后多久提交offset)auto-offset-reset:latest#earliest,latestkey-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializer2)启动信息4.2消息发送4.2.1发送类型KafkaTemplate默认使用异步发送,当调用发送。如果需要同步获取发送结果,调用get方法,详细代码参考:AsyncProducer.java消费者使用:KafkaConsumer.java1)同步发送ListenableFuture>future=kafkaTemplate.send("test",JSON.toJSONString(message));//注意,可以设置等待时间,超过后不再等待结果SendResultresult=future.get(3,TimeUnit.SECONDS);logger.info("发送结果:{}",result.getProducerRecord().value());通过swagger发送,控制台可以打印sendnormallyresult2)blockontheserver,suspendkafkaservicedocker-composepausekafka-1kafka-2sendmessageinswaggerandsynchronoussend:请求被阻塞,一直等待,返回一个超时后出错,异步发送(默认发送接口),请求立即返回那么,如何确认异步发送的消息的发送状态呢???向下看!3)注册监听代码参考:KafkaListener.java可以给kafkaTemplate设置一个监听消息发送情况的Listener,实现内部对应的方法kafkaTemplate.setProducerListener(newProducerListener(){});查看控制台,等待一段时间之后异步发送失败的消息会回调到注册的监听器com.itheima.demo.config.KafkaListener:error!message={"message":"1","sendTime":1609920296374}启动kafkadocker-composeunpausekafka-1当kafka-2再次发送消息时,同步和异步都能正常收发,监听进入成功回调com.iheima.demo.config.KafkaListener$1:好的,消息={“消息”:“1”,“发送时间”:1610089315395}com.iheima.demo.controller.PartitionConsumer:patition=1,消息:[{“消息”:“1”,“发送时间”:1610089315395}]可以看到在内部类KafkaListener$1中,也就是注册的Listener信息。4.2.2序列化消费者:KafkaConsumer.java的使用1)序列化详解前面使用了Kafka自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)。另外还有:ByteArray,ByteBuffer,Bytes,Double,Integer,Long等序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)基本上可以满足大部分场景2)自定义序列化并实现自己去实现对应的接口,有如下方法:publicinterfaceSerializerextendsCloseable{defaultvoidconfigure(Mapconfigs,booleanisKey){}//理论上只能正常运行通过实现这个byte[]serialize(Stringvar1,Tvar2);//默认调用上面的方法defaultbyte[]serialize(Stringtopic,Headersheaders,Tdata){returnthis.serialize(topic,data);}defaultvoidclose(){}}案例,参考:MySerializer.java在yaml中配置自己的encodervalue-serializer:com.iheima.demo.config.MySerializerresend,发现:消息发送端编码回调正常。但是消费端的消息内容是错误的!com.iheima.demo.controller.KafkaListener$1:ok,message={"message":"1","sendTime":1609923570477}com.itheima.demo.controller.KafkaConsumer:message:"{\"message\":\"1\",\"sendTime\":1609923570477}"呢?3)如果解码发送端有编码,我们自己定义编码,那么接收端自然会配备相应的解码策略代码参考:MyDeserializer.java,实现方法和编码器几乎一样!在yaml中配置自己的decodervalue-deserializer:com.iheima.demo.config.MyDeserializer再次收发,消息正常com.iheima.demo.controller.AsyncProducer$1:ok,message={"message":"1","sendTime":1609924855896}com.itheima.demo.controller.KafkaConsumer:message:{"message":"1","sendTime":1609924855896}4.2.3分区策略分区策略决定了消息传递到哪个分区根据关键。它也是顺序消费保护的基石。分区号给定,数据直接发送到指定分区。不给分区号,给数据的key值,根据key分区hashCode。既没有给出分区号也没有给出键值。直接round-robin分区和自定义分区,想干啥就干啥1)验证默认分区规则Sender代码参考:PartitionProducer.javaConsumer代码使用:PartitionConsumer.java通过swagger访问setKey:看控制台:再访问setPartition让我们设置分区号0发送,看控制台:2)自定义分区你要自己定义规则,按照我的要求把消息放到相应的分区吗?能!参考代码:MyPartitioner.java,MyPartitionTemplate.java,发送使用:MyPartitionProducer.java使用swagger,发送0开头和非0开头的两个key试试!备注:自己定义config参数比较麻烦。您需要打破默认的KafkaTemplate设置。您可以在KafkaConfiguration.java中添加带有@Bean注释的getTemplate以覆盖系统默认bean。为避免混淆,使用@Autowire注入4.3消息消费4.3.1消息组发送方使用:KafkaProducer.java1)代码参考:GroupConsumer.java,Listener复制3份,分别分配两个组,验证组消费:2)开始3)通过swagger发送2条消息同组下两次消费或者,group1和group2下只有一个消费者,获取到所有消息。4)消费者闲置。注意分区数量和消费者数量的组合。如果(消费者数量>分区数量),消费者就会闲置,浪费资源!验证方法:停止项目,删除testtopic,新建一个,这次只分配一个partition给它。重发两条消息尝试分析:group2可以消费1和2两条消息,group1下有两个消费者,但只分配给了-1和-2。这个进程是空闲的。4.3.2Shift提交1)自动提交前面的案例我们设置了如下两个选项,那么kafka会根据延迟设置自动提交enable-auto-commit:true#是否自动提交offsetauto-commit-interval:100#提交offset延时(收到消息后多久提交offset)2)手动提交有时候,我们需要手动控制offset的提交时机,比如保证消息在提交前严格消费,防止丢失或重复。接下来我们自己定义配置,覆盖上面的参数代码参考:MyOffsetConfig.java在消费者端通过Consumer提交offset,有几种方式:代码参考:MyOffsetConsumer.java同步提交,异步提交:manualCommit(),synchronous和asynchronous的区别下面会详细讨论。通过指定offset提交:offset()3)重复消费问题如果开启了手动提交模式,不要忘记提交offset。否则会造成重复消费!代码参考及对比:manualCommit(),noCommit()验证过程:使用km删除测试主题,创建测试空主题。方便观察消息偏移和注释掉其他Consumers的Component注解,只保留当前MyOffsetConsumer.java启动项目。使用swagger的KafkaProducer连续发送几条消息,注意控制台。它们可以食用,没问题:但是!尝试重启:无论重启多少次,没有提交offset的消费组都会再次消费!!!然后尝试通过命令行查询偏移量:4)经验总结commitSync()方法,即同步提交,会提交最后一个偏移量。commitSync()会不断重试,直到成功提交或遇到无法恢复的错误,但commitAsync()不会。这就造成了一个陷阱:如果你异步提交,对于偶尔的提交失败,不重试问题不大,因为如果提交失败是暂时性的问题,那么后续的提交总会成功的。只要成功一次,就会提交offset。但!如果这是消费者关闭时发生的最后一次提交,则确保提交成功,如果没有提交则停止进程。会造成重复消费!所以commitAsync()和commitSync()一般在消费者关闭前结合使用。详细代码参考:MyOffsetConsumer.manualOffset()本文由传智教育博学谷-狂野建筑师教研组发布,转载请注明出处!如果本文对您有帮助,请关注并点赞;有什么建议也可以留言或私信。您的支持是我坚持创作的动力