当前位置: 首页 > 后端技术 > PHP

PHP下kafka常用脚本实践

时间:2023-03-29 15:35:26 PHP

在阅读本教程之前,最好先尝试阅读:PHP下kafka的实践实践你尝试实践的kafka知识:创建主题生产消息消费消息主题信息获取消费组获取消费组偏移量内置命令#kafka安装directory#*在bin目录中代表我们将使用的脚本connect-distributed.shkafka-log-dirs.shkafka-streams-application-reset.shconnect-standalone.shkafka-mirror-maker.shkafka-topics.sh*kafka-acls.shkafka-preferred-replica-election.shkafka-verifiable-consumer.shkafka-broker-api-versions.shkafka-producer-perf-test.shkafka-verifiable-producer.shkafka-configs.shkafka-reassign-partitions.shtrogdor.shkafka-console-consumer.sh*kafka-replay-log-producer.shwindowskafka-console-producer.sh*kafka-replica-verification.shzookeeper-security-migration.shkafka-consumer-groups.sh*kafka-run-class.shzookeeper-server-start.shkafka-consumer-perf-test.shkafka-server-start.shzookeeper-server-stop.shkafka-delegation-tokens.shkafka-server-stop.shzookeeper-shell.sh*kafka-delete-records.shkafka-simple-consumer-shell.sh创建主题(kafka-topics.sh)#创建一个测试主题,1个分区,1个副本,这里是其实replica可以理解为最少有broker中的个数,must>=1#--zookeeperlocalhost:2181(kafka默认端口:2181)bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictest#创建2个分区和test02主题bin/kafka-topics.sh的1个副本--create--zookeeperlocalhost:2181--replication-factor1--partitions2--topictest02#列出所有topicbin/kafka-topics.sh--list--zookeeperlocalhost:2181__consumer_offsetstesttest02#注意这里的__consumer_offsets是kafka默认创建的,用于存放kafka消费记录的topic,我们暂且不管它#list具体topic信息bin/kafka-topics.sh--describe--zookeeperlocalhost:2181--topictestTopic:testPartitionCount:1ReplicationFactor:1配置:主题:测试分区:0领导者:0副本:0Isr:0bin/kafka-topics.sh--describe--zookeeperlocalhost:2181--topictest02Topic:test02PartitionCount:2ReplicationFactor:1Configs:Topic:test02Partition:0Leader:0Replicas:0Isr:0Topic:test02Partition:1Leader:0Replicas:0Isr:0#从上面的显示我们可以发现第一句话是显示整体信息,下方缩进显示分区信息consumer(kafka-console-consumer.sh)#启动一个消费者组消费,这里我们需要打开一个shell终端,因为它会等待输出bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topictest--from-beginning#等待输出生产者(kafka-console-consumer.sh)#这里我们需要打开一个shell终端,因为它会等待对于输入bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest#等待我们输入的消息出现>>msg01>msg02>msg03#注意观察上面的消费者端,和自动输出我们的消息msg01msg02msg03查看消费者组bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--list注意:这里不会显示旧的基于Zookeeper的消费者的信息。console-consumer-25379console-consumer-73410console-consumer-27127console-consumer-61887console-consumer-61324#这里我们再启动一个consumer来输出(因为我们不知道前一个消费者的最新消费者组ID)bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--listNote:这不会显示有关旧的基于Zookeeper的消费者的信息。console-consumer-25379console-consumer-73410console-consumer-27127console-consumer-39416#这是我们新的消费者组id。使用这个消费组进行练习console-consumer-61887console-consumer-61324#查看消费组的具体信息bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--describe--groupconsole-consumer-39416注意:这不会显示有关旧的基于Zookeeper的消费者的信息。主题分区CURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest0990consumer-1-94afec29-5042-4108-8619-ba94812f10a8/127.0.0.1consumer-1#Viewofflineconsole-consumer-25379消费者组bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--describe--groupconsole-consumer-25379注意:这不会显示有关旧的基于Zookeeper的消费者的信息。消费者group'console-consumer-25379'hasnoactivemembers.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest0594---#查看离线console-consumer-27127消费者组bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--describe--groupconsole-consumer-27127注意:这不会显示有关旧的基于Zookeeper的消费者的信息。消费者组'console-consumer-27127'没有activemembers.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest0693---#这里我们发现每次都会生成一个消费者组和一个消费者,给我们带来不便makeoneMultipleconsumersinaconsumergrouptest启动一个有多个消费者的消费者组cpconfig/consumer.propertiesconfig/consumer_g1.propertiesvimconfig/consumer_g1.properties#修改消费者组名group.id=test-consumer-group=>group.id=test-g1bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topictest--from-beginning--consumer.configconfig/consumer_g1.propertiesmsg01msg02msg03bin/kafka-console-consumer。sh--bootstrap-serverlocalhost:9092--topictest--from-beginning--consumer.configconfig/consumer_g1.properties#无输出(因为我们只有一个分区用于测试主题,一个分区只能在下面使用同一个消费者组A消费者消费,所以这个是空闲的)#查看消费者组bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--list注意:这里不会显示旧的基于Zookeeper的消费者的信息。console-consumer-25379console-consumer-73410console-consumer-27127console-consumer-39416test-g1console-consumer-61887console-consumer-61324#查看test-g1消费组bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--describe--grouptest-g1注意:这不会显示有关旧的基于Zookeeper的消费者的信息。主题分区CURRENT-OFFSETLOG-END-OFFSETLAG消费者IDHOSTCLIENT-IDtest0990consumer-1-43922b0c-34e0-47fe-b597-984c9e6a2884/127.0.0.1consumer-1#我们启动test02的2个消费组看看情况#我们启动2个test02主题的Consumertest-g1消费者组的bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topictest02--from-beginning--consumer.configconfig/consumer_g1.propertiesbin/kafka-console-consumer。sh--bootstrap-serverlocalhost:9092--topictest02--from-beginning--consumer.configconfig/consumer_g1.properties#查看test-g1消费者组bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--describe--grouptest-g1注意:这不会显示有关旧的基于Zookeeper的消费者的信息。TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest020000consumer-1-90ccc960-557a-46ab-a799-58c35ee67undefined0.1consumer-11test02(0)bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--describe--groupmy-group--state注意:这不会显示有关旧的基于Zookeeper的消费者的信息.COORDINATOR(ID)ASSIGNMENT-STRATEGYSTATE#MEMBERSlocalhost:9092(0)rangeStable3bin/kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--delete--groupconsole-consumer-25379zookeeper(zookeeper-shell).sh)连接bin/zookeeper-shell.sh127.0.0.1:2181Connectingto127.0.0.1:2181WelcometoZooKeeper!JLinesupportisdisabledWATCHER::WatchedEventstate:SyncConnectedtype:Nonepath:null经常使用命令ls/[cluster,controller_epoch,控制器,经纪人,动物园管理员,管理员,isr_change_notification,消费者,log_dir_event_notification,latest_producer_id_block,config]