当前位置: 首页 > 后端技术 > Python

python操作dockerkafka

时间:2023-03-26 12:05:25 Python

本机IP为10.30.6.24。在后续的配置过程中,需要根据自己的IP信息配置修改kafka。默认情况下,127.0.0.1用于访问和配置compose.yaml文件服务:zookeeper:image:zookeepercontainer_name:demo-zookeeperports:-"2181:2181"restart:alwayskafka:image:wurstmeister/kafkacontainer_name:demo-kafka端口:-“9092:9092”ulimits:nofile:soft:262144hard:262144environment:DOCKER_API_VERSION:1.41KAFKA_ADVEINTERSED。:9092"KAFKA_LISTENERS:"PLAINTEXT://0.0.0.0:9092"KAFKA_BROKER_ID:1KAFKA_PORT:9092KAFKA_ZOOKEEPER_CONNECT:zookeeper:2181KAFKA_LOG_DIRS:/kafka/kafka-logs-backend.sock:/var/run/docker.sock-kafka-data:/kafka命令:/bin/sh-c"rm-f/kafka/kafka-logs-backend/meta.properties&&start-kafka.sh"volumes:kafka-data:{}启动命令$dockercomposeup-d配置参数说明ulimits操作系统提供了一种方法来限制可以使用的资源量。Linux系统默认值为1024,具体执行命令ulimit-a查看,由于消息队列文件读写频繁,需要增加配置,修改Kafka的默认值。最大打开文件数的限制可以是硬限制也可以是软限制,但是软限制不能超过硬限制环境变量说明DOCKER_API_VERSION:docker版本命令的APIVersion输出信息KAFKA_ADVERTISED_LISTENERS:注册地址端口卡夫卡到动物园管理员。这个地方的数据目前是PLAINTEXT://10.30.6.24:9092,这个IP地址需要根据具体的机器IP修改,表示客户端可以访问到当前节点的哪个IP。如果修改了网卡的IP,这个地方的配置KAFKA_LISTENERS也需要修改:配置kafka的监听端口表示本机kafka的当前节点监听的是哪个网卡。这个地方的IP地址可以填成0.0.0.0表示监听所有网卡的信息。KAFKA_BROKER_ID:一个kafka节点就是一个broker,brokerid在一个集群中是唯一的。KAFKA_PORT:配置kafka开放端口KAFKA_ZOOKEEPER_CONNECT:配置对应的zookeeper连接信息,因为是在同一个docker-compose中,所以可以使用服务名作为主机连接信息KAFKA_LOG_DIRS:保存日志数据的目录,默认为/tmp/kafka-logsmountvolume解释-/var/run/docker.sock:/var/run/docker.sock:挂docker的sockin-kafka-data:/kafka:mountkafkalog信息用于持久化,如果不为数据持久化,可以去掉这一步,挂载启动命令/bin/sh-c"rm-f/kafka/kafka-logs-backend/meta.properties&&start-kafka.sh"因为挂载数据的时候,kafka的配置信息也被挂载并保存在meta.properties文件中。文件内容如下,会保存一个cluster.id。当容器在销毁和重建的时候,kafka会重新创建一个cluster.id,同时也会检查meta.properties的信息##MonJun2706:38:03GMT2022cluster.id=XMHTDGRvQ5yJnEfXKhuabgversion=0broker.id=1容器启动时的报错如下,主要是kafka检查cluster.id不一致,导致kafka.common.InconsistentClusterIdException:TheClusterID2Z7pfznDRmWeLJNp3nZm8Adoesn'tmatchstoredclusterIdSome(XMHTDGRvQ5yJnEfXKhuabg)inmeta.properties。集群代理正在尝试错误。连接可能是错误的。所以需要配置在kafka启动前删除持久保存的meta.properties配置信息。这一步不影响持久化数据,主要是为了避免冲突和错误。python客户端运行安装依赖库$pipinstallkafka-pythonProducerproducer.pyimportjsonfromkafkaimportKafkaProducer#配置值序列化方式,选择kafka节点信息#如果是远程broker,需要修改127.0.0.1为对应的IP地址producer=KafkaProducer(value_serializer=lambdam:json.dumps(m).encode('ascii'),bootstrap_servers=['10.30.6.24:9092'])#发送操作默认为异步for_inrange(100):producer.send('my-topic',{'key':'value'})#阻塞直到实际操作sendproducer.flush()consumerconsumer.pyimportjsonfromkafkaimportKafkaConsumer#consumer配置,topic信息同producerconsumer=KafkaConsumer('my-topic',group_id='my-group',auto_offset_reset='earliest',bootstrap_servers=['10.30.6.24:9092'],value_deserializer=lambdam:json.loads(m.decode('ascii')))formessageinconsumer:#消息值和键是原始字节——必要时解码!#例如,对于unicode:`message.value.decode('utf-8')`print("%s:%d:%d:key=%svalue=%s"%(message.topic,message.partition,message.offset,message.key,message.value))auto_offset_reset可选参数最早如下:当每个partition下有提交的offset时,从提交的offset开始消费;iftherenosubmittedoffset,startconsumptionfromthebeginninglatest:当每个分区下有提交的offset时,从提交的offset开始消费,没有提交的offset,消费分区下新产生的数据(这是默认选项)none:当topic的所有partition都提交了offset后,在offset之后开始消费。只要有partition没有提交的offset,那么Throwexception(不要用这个)打开两个terminal,执行$pythonproducer.py$pythonconsumer.pyextension如果容器里也有python客户端,你可以修改它compose.yaml中kafka容器的环境变量配置KAFKA_ADVERTISED_LISTENERS:"PLAINTEXT://kafka:9092"python消费者和生产者可以使用kafka:9092访问broker如果程序在容器外,也可以配置修改/etc/hostsnew添加一行数据127.0.0.1Kafka最终实现了基于kafka的host参数的访问。阅读kafka官方文档kafka配置远程访问kafkapython第三方库文档kafka理解listeners和advertised.listeners