当前位置: 首页 > 科技观察

使用Kafka和MongoDB进行异步处理

时间:2023-03-15 19:15:47 科技观察

在我之前的博文“我的第一个Go微服务:使用MongoDB和Docker进行多阶段构建”中,我创建了一个示例Go微服务,它发布一个RESTfulhttp端点并保存从HTTPPOST接收的数据到MongoDB数据库。在此示例中,我将数据存储与MongoDB分离,并创建了另一个微服务来处理它。我还添加了Kafka来服务消息层,以便微服务可以异步处理自己的关注点。如果您有时间查看,我在这个视频中记录了这篇博文的整个过程:)这是这个使用两个微服务的简单异步处理示例的高级架构图。rest-kafka-mongo-microservice-draw-io微服务1-是一个RESTful微服务,它从/POSThttp调用接收数据。收到请求后,从http请求中取出数据,保存到kafka中。保存后,它通过/POSTing相同的数据来响应调用者。微服务2——是在Kafka中订阅了一个topic的微服务,微服务1的数据就存放在这个topic中。微服务使用消息后,会将数据保存在MongoDB中。在继续之前,我们需要一些东西才能运行这些微服务:下载Kafka-我使用的版本是kafka_2.11-1.1.0安装librdkafka-不幸的是,这个库应该安装在目标系统中KafkaGo客户端运行MongoDB。您可以在我之前的文章中了解到这一点,我在这篇文章中使用了MongoDBdocker镜像。让我们开始吧!首先,启动卡夫卡。在运行Kafka服务器之前,您需要运行Zookeeper。这是一个示例:$cd/<下载路径>/kafka_2.11-1.1.0$bin/zookeeper-server-start.shconfig/zookeeper.properties然后运行??Kafka-我使用端口9092连接到Kafka。如果需要更改端口,只需在config/server.properties中配置即可。如果你像我一样是新手,我建议你暂时使用默认端口。$bin/kafka-server-start.shconfig/server.propertiesKafka运行后,我们需要MongoDB。很简单,用这个docker-compose.yml就可以了。version:'3'services:mongodb:image:mongoports:-"27017:27017"volumes:-"mongodata:/data/db"networks:-network1volumes:mongodata:networks:network1:使用DockerCompose运行MongoDBdocker容器.这里的docker-composeup是微服务1的相关代码。我只是修改了我前面的示例去保存到Kafka而不是MongoDB:rest-to-kafka/rest-kafka-sample.gofuncjobsPostHandler(whttp.ResponseWriter,r*http.Request){//Retrievebodyfromhttprequestb,err:=ioutil.ReadAll(r.Body)deferr.Body.Close()iferr!=nil{panic(err)}//保存数据到Jobstructvar_jobJoberr=json.Unmarshal(b,&_job)iferr!=nil{http.Error(w,err.Error(),500)return}saveJobToKafka(_job)//将作业结构转换为jsonjsonString,err:=json.Marshal(_job)iferr!=nil{http.Error(w,err.Error(),500)return}//设置content-typehttp头w.Header().Set("content-type","application/json")//发回数据作为响应w.Write(jsonString)}funcsaveJobToKafka(jobJob){fmt.Println("savetokafka")jsonString,err:=json.Marshal(job)jobString:=string(jsonString)fmt.Print(jobString)p,错误:=kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost:9092"})iferr!=nil{panic(err)}//向主题生成消息(异步)topic:="jobs-topic1"for_,word:=range[]string{string(jobString)}{p.Produce(&kafka.Message{TopicPartition:kafka.TopicPartition{Topic:&topic,Partition:kafka.PartitionAny},Value:[]byte(word),},nil)}}下面是微服务2的代码,这段代码最重要的是从Kafka消费数据,保存的部分我在上一篇博文中已经讲过了。这里代码的重点部分是从Kafka中消费数据:kafka-to-mongo/kafka-mongo-sample.gofuncmain(){//CreateMongoDBsessionsession:=initialiseMongo()mongoStore.session=sessionreceiveFromKafka()}funcreceiveFromKafka(){fmt.Println("开始从Kafka接收信息")c,err:=kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers":"localhost:9092","group.id":"group-id-1","auto.offset.reset":"earliest",})iferr!=nil{panic(err)}c.SubscribeTopics([]string{"jobs-topic1"},nil)for{味精,err:=c.ReadMessage(-1)iferr==nil{fmt.Printf("从Kafka%s收到:%s\n",msg.TopicPartition,string(msg.Value))job:=string(msg.Value)saveJobToMongo(job)}else{fmt.Printf("消费者错误:%v(%v)\n",err,msg)break}}c.Close()}funcsaveJobToMongo(jobStringstring){fmt.Println("SavetoMongoDB")col:=mongoStore.session.DB(database).C(collection)//保存数据到Jobstructvar_jobJobb:=[]byte(jobString)err:=json.Unmarshal(b,&_job)iferr!=nil{panic(err)}//将作业插入MongoDBerrMongo:=col.Insert(_job)iferrMongo!=nil{panic(errMongo)}fmt.Printf("SavedtoMongoDB:%s",jobString)}让我们演示一下,运行微服务1以确保Kafka正在运行。$gorunrest-kafka-sample.go我使用Postman向微服务1发送数据Screenshot-2018-04-29-22.20.33这里是日志,你可以在微服务1中看到,当你看到这个的时候,它表示Postman发来的数据已经收到,并保存到Kafka中。Screenshot-2018-04-29-22.22.00因为我们还没有运行微服务2,所以数据只是被微服务1存储在Kafka中,我们消费它,通过运行中的微服务2保存到MongoDB中。$gorunkafka-mongo-sample.go现在,您将看到微服务2上消费的数据并将其保存到MongoDB。Screenshot-2018-04-29-22.24.15检查数据是否保存到MongoDB。如果有数据,我们做到了!Screenshot-2018-04-29-22.26.39完整的源代码可以在这里找到:https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice