大家好,我是北军。在开发中,消息中间件通常用于服务之间的通信。如果我们使用某个MQ,那么消息中间件和我们的系统是高度耦合的。将来有一天,如果我们要换成另外一个MQ,我们的改动会比较大。为了解决这个问题,我们可以使用SpringCloudStream来集成我们的消息中间件,降低耦合度,让服务更加关注自己的业务逻辑。今天给大家带来一个大家都能做的SpringCloudStream集成Kafka的快速入门实例。一、前言SpringCloudStream是一个高度可扩展的事件消息驱动的微服务框架。简单的说就是帮你操作MQ,可以和底层的MQ框架解耦。以后想更换MQ框架的时候会更容易。Kafka是一个分布式发布-订阅消息系统,起源于LinkedIn的一个项目,并于2011年成为一个开源的Apache项目。ZooKeeper是Apache软件基金会的一个软件项目。它为大规模分布式计算提供开源的分布式配置服务、同步服务和命名注册。Kafka的实现也依赖于zookeeper。2.Windows搭建简单Kafka2.1启动zookeeper要使用Kafka,首先需要启动zookeeper,windows下搭建zookeeper也很简单。以下步骤即可完成:下载zookeeper(本文使用3.7.0版本,下载链接在文末)配置基本环境变量:将conf文件夹下的zoo_sample.cfg重命名为zoo.cfg.并修改其工作目录dataDir。bin文件夹下有zkEnv.cmd,里面有zookeeper相关的配置,包括JAVA_HOME,所以系统环境变量需要配置JAVA_HOME,或者直接替换成Java的路径。启动,运行bin目录下的zkServer.cmd脚本启动zookeeper。默认启动端口为2181,正常启动如下:2.2构建Kafka本地使用Kafka也是如下步骤:下载Kafka(本文使用2.11版本,下载链接在文末)。环境变量配置:查看config文件下server.properties配置文件中的zookeeper配置。zookeeper.connect=localhost:2181在bin/windows文件夹下的kafka-run-class.bat文件中有一个JAVA_HOME配置,也可以直接改成系统的Java路径。使用如下命令启动kafka根目录下的kafka,并注册到zookeeper中。#.\bin\windows\kafka-server-start.bat.\config\server.properties创建一个topic,在bin\windows目录下使用如下命令。创建一个名为“test”的主题。kafka-topics.bat--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictest使用windows命令窗口的生产者和消费者,在bin\windows目录下使用如下命令.#test主题消息生产者kafka-console-producer.bat--broker-listlocalhost:9092--topictest#test主题消息消费者kafka-console-consumer.bat--bootstrap-serverlocalhost:9092--topictest#testtopic的消息消费者(consumerfromscratch)kafka-console-consumer.bat--bootstrap-serverlocalhost:9092--from-beginning--topickafka启动windows界面如下:3SpringCloudStream集成了Kafka3.1,引入依赖由于我们直接使用SpringCloudStream集成Kafka,官方已经有了现成的starter。org.springframework.cloudspring-cloud-starter-stream-kafka2.1.0.RELEASE3.2关于kafka配置spring:application:name:shop-servercloud:stream:bindings:#配置你定义的channel和哪个中间件交互input:#MessageChannel中Input和Output的值destination:test#目标topic相当于kafka的topicoutput:destination:test1#本例创建另一个topic(test1)来区分不同的功能。default-binder:kafka#默认的binder是kafkakafka:binder:zk-nodes:localhost:2181bootstrap-servers:localhost:9092#kafka服务地址,集群部署时需要多次配置,consumer:group-id:consumer1producer:key-serializer:org.apache.kafka.common.serialization.ByteArraySerializervalue-serializer:org.apache.kafka.common.serialization.ByteArraySerializerclient-id:producer1server:port:81003.3消费者示例首先需要定义SubscribableChannel接口方法使用输入注解。publicinterfaceSink{StringINPUT="input";@Input("input")SubscribableChannelinput();}然后简单的使用StreamListener监听某个频道的消息。@Service@EnableBinding(Sink.class)publicclassMessageSinkHandler{@StreamListener(Sink.INPUT)publicvoidhandler(Messagemsg){System.out.println("收到消息:"+msg);}}cloudstream在配置中绑定了对应的Kafka主题,如下:cloud:stream:bindings:#配置你定义的channel与哪个中间件交互input:#InputvalueinSubscribableChanneldestination:test#Targettopic我们使用Kafka控制台生产者生产消息。kafka-console-producer.bat--broker-listlocalhost:9092--topictest同时启动我们的示例SpringBoot项目,使用producer推送多条消息。我们还启动了一个Kafka控制台消费者。kafka-console-consumer.bat--bootstrap-serverlocalhost:9092--topictest消费结果如下:SpringBoot项目消费消息如下:3.4Producer示例首先需要定义生产者MessageChannel,以及这里将使用输出注释。公共接口KafkaSource{StringOUTPUT="output";@Output(KafkaSource.OUTPUT)MessageChanneloutput();}使用MessageChannel发送消息。@ComponentpublicclassMessageService{@AutowiredprivateKafkaSource来源;publicObjectsendMessage(Objectmsg){source.output().send(MessageBuilder.withPayload(msg).build());返回消息;}定义一个RestAPI来触发消息发送。@RestControllerpublicclassMessageController{@AutowiredprivateMessageServicemessageService;@GetMapping(value="/sendMessage/{msg}")publicStringsendMessage(@PathVariable("msg")Stringmsg){messageService.sendMessage("messageService发出:"+msg+LocalDateTime.now());返回“已发送消息”;}}配置中生产者的配置如下:cloud:stream:bindings:input:destination:testoutput:destination:test1#目标topic启动SpringBootApp,并触发下面的API调用。http://localhost:8100/sendMessage/JavaNorthProducer我们还启动了一个Kafka控制台消费者,这里我们使用另一个test1主题。kafka-console-consumer.bat--bootstrap-serverlocalhost:9092--topictest1consoleconsumer消费消息如下:总结本章首先介绍了一个简单的SpringCloudStream集成Kafka的例子,实现了一个简单的发布订阅功能。但是SpringCloudStream肯定还有更多的功能,后续我们会继续深入了解Stream的功能。以上示例仓库:https://github.com/javatechnorth/java-study-note/tree/master/kafka下载链接:https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gzhttps://kafka.apache.org/downloads