01。ApacheKafka是一个开源的消息引擎系统。它在项目中的作用主要是削峰填谷和解耦。在本文中,我们只介绍ApacheKafka的Golang客户端库Sarama。Sarama是MIT许可的Golang客户端库,适用于ApacheKafka0.8及更高版本。如果读者对ApacheKafka服务器不熟悉,建议先阅读官方文档的介绍部分。本文使用的版本是ApacheKafka2.8。02.Producer我们可以使用Sarama库的AsyncProducer或者SyncProducer来生产消息。大多数情况下首选AsyncProducer来生成消息。它通过通道接收消息,并在后台以异步方式尽可能高效地生成消息。SyncProducer在发送Kafka消息后阻塞,直到收到ACK确认。SyncProducer有两个注意事项:它通常效率较低,实际的持久性保证取决于Producer.RequiredAcks的配置值。SyncProducer确认的消息在某些配置中有时仍然会丢失,但使用起来更简单。为了方便读者理解,本文介绍如何使用SyncProducer作为生产者。如果读者想了解如何使用AsyncProducer作为生产者,请参考官方文档。使用SyncProducer作为生产者的示例代码:funcsendMessage(brokerAddr[]string,config*sarama.Config,topicstring,valuesarama.Encoder){producer,err:=sarama.NewSyncProducer(brokerAddr,config)iferr!=nil{fmt.Println(err)return}deferfunc(){iferr=producer.Close();err!=nil{fmt.Println(err)return}}()msg:=&sarama.ProducerMessage{Topic:topic,Value:value,}partition,offset,err:=producer.SendMessage(msg)iferr!=nil{fmt.Println(err)return}fmt.Printf("partition:%doffset:%d\n",partition,offset)}看上面这段代码,我们调用NewSyncProducer()来创建一个新的SyncProducer,给定代理地址和配置信息。调用SendMessage()以生成给定的消息,并且仅在生成成功或失败时返回。它会返回生产消息的分区(Partition)和偏移量(Offset),如果消息生产失败则返回错误。请注意,必须在生产者上调用Close()以避免泄漏,因为当它超出范围时可能不会自动进行垃圾回收。03.Consumer我们可以使用Sarama库的消费者Consumer或者消费者组ConsumerGroupAPI来消费消息。为了方便读者理解,本文介绍使用Consumer消费消息。消费者管理处理来自代理的Kafka消息的PartitionConsumers。Consumer消费消息的示例代码:funcconsumer(brokenAddr[]string,topicstring,partitionint32,offsetint64){consumer,err:=sarama.NewConsumer(brokenAddr,nil)iferr!=nil{fmt.Println(err)return}deferfunc(){iferr=consumer.Close();err!=nil{fmt.Println(err)return}}()partitionConsumer,err:=consumer.ConsumePartition(topic,partition,offset)iferr!=nil{fmt.Println(错误)return}deferfunc(){iferr=partitionConsumer.Close();err!=nil{fmt.Println(err)return}}()formsg:=rangepartitionConsumer.Messages(){fmt.Printf("partition:%doffset:%dkey:%sval:%s\n",msg.Partition,msg.Offset,msg.Key,msg.Value)}}阅读上面的代码,我们调用NewConsumer()创建一个新的消费者,给定代理地址和配置信息。给定主题、分区和偏移量,调用ConsumePartition()创建一个PartitionConsumer。PartitionConsumer处理来自给定主题和分区的Kafka消息。需要注意的是,为了防止泄漏,必须调用Consumer和partitionConsumer的Close(),因为当它超出作用域时,可能不会被自动垃圾回收。04.总结本文主要介绍如何使用ApacheKafka的Golang语言客户端库Sarama来生产和消费Kafka消息。关于生产者和消费者,分别给出了一个简单的例子。此外,Sarama库还提供了许多其他API。有兴趣的读者可以阅读官方文档了解更多。
