当前位置: 首页 > 科技观察

SpringBoot+Nacos+Kafka实现微服务流程编排_0

时间:2023-03-12 16:53:25 科技观察

前言微服务开发涉及到一些数据处理模块的开发。每个处理业务都会开发一个独立的微服务,方便后续的扩展和流程编排。学习了SpringCloudDataFlow等框架后,感觉这个框架对我们来说太重了,维护起来也比较麻烦。因此,我们按照流编排的思路,基于我们目前的技术栈,实现了一个简单的流编排功能。简单的说,就是希望我们的流程编排可以对微服务进行可插拔,微服务的数据入出数据可以不停的修改。准备工作Nacos安装及入门如果想自学,推荐使用docker安装,命令如下:拉取镜像:dockerpullnacos/nacos-server创建服务:dockerrun--envMODE=standalone--namenacos-d-p8848:8848nacos/nacos-server然后在浏览器输入ip:8848/nacos,账号nacos;密码nacos。Docker可以帮助我们快速安装服务,减少环境准备的时间。准备三个SpringBoot服务,引入Nacos和Kafkaorg.springframework.bootspring-boot-starter-parent2.1.0.RELEASEorg.springframework.kafkaspring-kafkacom.alibaba.bootnacos-config-spring-boot-starter0.2.1配置文件:spring:kafka:bootstrap-servers:kafka-server:9092producer:acks:allconsumer:group-id:node1-group#三个服务是node1node2node3enable-auto-commit:false#部署的nacos服务nacos:config:server-addr:nacos-server:8848建议配置本地主机填写在xxx-server而不是服务ip。业务解释我们现在需要对三个服务进行排布,保证每个服务都可以插拔,服务的位置也可以调整。示意图如上:node1服务监听pre-service发送的数据流,输入topic是pre-data服务输出topicnode2监听node1处理的数据,所以node2监听的topic是by输出的topicnode1,node3同理,最后node3的处理完成将数据发送到数据流的末尾,我们现在需要调整流程去掉node2-server,只需要将node1-sink改为node2即可-sink,让我们的服务可以灵活地将不同项目的数据嵌入到流处理业务中,即插即用(当然数据格式的业务层面需要约定)。动态可调还可以保证当某个服务节点出现问题时,可以立即改变数据流向,比如发送到数据中心。暂存服务,避免Kafka中数据堆积过多,吞吐量不均衡Nacos配置①创建配置通常,流式排列中的每个服务都有一个输入和输出,分别是input和sink,所以我们需要为每个服务配置两个topic,分别是input-topicoutput-topic,我们在nacos中添加input和output的配置。nacos配置项需要配置groupId和dataId。通常,我们使用服务名称作为groupId,配置项的名称作为dataId。例如node1-server服务有一个input配置项,配置如下:完成其中一个服务的配置,其他服务的配置参考下图:②读取配置代码如下:@Configuration@NacosPropertySource(dataId="input",groupId="node1-server",autoRefreshed=true)//autoRefreshed=true表示nacos中的配置更改后会刷新,false表示只有读取时的值服务启动会使用@NacosPropertySource(dataId="sink",groupId="node1-server",autoRefreshed=true)publicclassNacosConfig{@NacosValue(value="${input:}",autoRefreshed=true)privateString输入;@NacosValue(value="${sink:}",autoRefreshed=true)privateStringsink;publicStringgetInput(){返回输入;}publicStringgetSink(){返回接收器;}}③监控配置变化服务的输入需要在服务启动时创建消费者,主题变化时重启创建消费者,移除旧主题的消费者,输出是业务驱动的,不需要监控变化,每次发送都会读取最新的配置主题。因为上面的配置类中autoRefreshed=true,这样只会刷新nacosConfig中的配置值。服务需要知道配置变化来驱动消费业务的创建,需要创建nacos配置监听器。/***监听Nacos配置变化,创建消费者,更新消费*/@ComponentpublicclassConsumerManager{@Value("${spring.kafka.bootstrap-servers}")privateStringservers;@Value("${spring.kafka.consumer.enable-auto-commit}")privatebooleanenableAutoCommit;@Value("${spring.kafka.consumer.group-id}")privatebooleangroupId;@AutowiredprivateNacosConfignacosConfig;@AutowiredprivateKafkaTemplatekafkaTemplate;//用于存放当前消费者使用的主题privateStringtopic;//用于执行消费者线程privateExecutorServiceexecutorService;/***监听输入*/@NacosConfigListener(dataId="node1-server",groupId="input")publicvoidinputListener(Stringinput){//当这个监听被触发时,NacosConfig中实际的输入值已经是最新值。我们只需要这个监听器来触发我们更新消费者的业务。StringinputTopic=nacosConfig.getInput();//我之所以使用nacosConfig读取是因为监听的内容是input=xxxx而不是xxxx。如果需要自己拦截,nacosConfig中的contentframe会处理的很好。先看第一张图的配置内容。明白了//首先检查当前局部变量topic是否有值。如果有值,则表示更新消费者。如果没有价值,就创造它if(topic!=null){//停止旧的消费者线程executorService.shutdownNow();executorService==null;}//根据topic=inputTopic为新主题创建消费者;ThreadFactorythreadFactory=newThreadFactoryBuilder().setNameFormat(topic+"-pool-%d").build();executorService=newThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue(2),threadFactory);//执行消费业务executorService.execute(()->consumer(topic));}/***创建消费者*/publicvoidconsumer(Stringtopic){Propertiesproperties=newProperties();properties.put("bootstrap.servers",服务器);properties.put("enable.auto.commit",enableAutoCommit);properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("group.id",groupId);KafkaConsumerconsumer=newKafkaConsumer<>(properties);consumer.subscribe(Arrays.asList(topic));try{while(!Thread.currentThread().isInterrupted()){持续时间duration=Duration.ofSeconds(1L);ConsumerRecords记录=consumer.poll(duration);for(ConsumerRecordrecord:records){Stringmessage=record.价值();//执行数据处理业务,省略业务实现StringhandleMessage=handle(message);//处理完成后,发送给下一个节点kafkaTemplate.send(nacosConfig.getSink(),handleMessage);}}consumer.commitAsync();}}catch(异常e){记录器。错误(e.getMessage(),e);}最后{尝试{消费者。提交同步();}最后{consumer.close();}}}}总结流编排的思想是可以整体调整数据流向。我们以此为需求,根据一些主流框架提供的API,实现了自己的动态调整方案。可以帮助您更好地理解流编码的思想和原理。在实际业务中,还有很多业务问题需要突破。我们更多地处理这个是因为服务是可插拔的,这有利于流处理微服务在项目中的灵活搭配。