当前位置: 首页 > 后端技术 > PHP

Kafka-PHP使用Rdkafka生产-消费数据

时间:2023-03-29 13:51:32 PHP

Kafka集群部署安装rdkafkardkafka依赖libkafkayuminstallrdkafkardkafka-develpeclinstallrdkafkaphp--rirdkafkahttp://pecl.php.net/package/r...查看支持kafka客户端版本producer连接集群,创建topic,生产数据。setLogLevel(LOG_DEBUG);//链接kafka集群$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");//创建主题$topic=$rk->newTopic("topic_1");while(true){$message="你好卡夫卡".date("Y-m-dH:i:s");回声“你好卡夫卡”。日期(“Y-m-dH:i:s”)。PHP_EOL;尝试{$topic->produce(RD_KAFKA_PARTITION_UA,0,$message);睡觉(2);}catch(\Exception$e){echo$e->getMessage().PHP_EOL;}}Consumer-HighLevel自动分配partition、rebalance、consumergroup。setRebalanceCb(function(RdKafka\KafkaConsumer$kafka,$err,array$partitions=null){开关($err){caseRD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo"Assign:";var_dump($partitions);$kafka->assign($partitions);break;caseRD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo"Revoke:";var_dump($partitions);$kafka->assign(null);break;default:thrownew\Exception($err);}});//配置group.id。所有具有相同group.id的消费者将消费//不同的分区。$conf->set('group.id','group_1');//Kafka代理的初始列表$conf->set('metadata.broker.list','192.168.20.6:9092,192.168.20.6:9093');$topicConf=newRdKafka\TopicConf();//设置没有消息时从哪里开始消费消息//偏移量存储中的初始偏移量或所需偏移量超出范围。//'smallest':从头开始??$topicConf->set('auto.offset.reset','smallest');//设置配置用于订阅/分配的主题$conf->setDefaultTopicConf($topicConf);$consumer=newRdKafka\KafkaConsumer($conf);//订阅主题'topic_1'$consumer->subscribe(['topic_1']);echo"Waitingforpartitionassignment...(maketakesometimewhen\n";echo"离开后迅速重新加入群组。)\n";while(true){$message=$consumer->consume(3e3);开关($message->err){caseRD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);休息;案例RD_KAFKA_RESP_ERR__PARTITION_EOF:睡眠(2);案例RD_KAFKA_RESP_ERR__TIMED_OUT:echo$message->errstr()。PHP_EOL;休息;默认值:抛出新的\Exception($message->errstr(),$message->err);休息;}}Consumer-LowLevel指定分区消费phpconsumer_lowlevel.php[partitonNuo]LowLevel没有消费组的概念,也可以认为每个消费者属于一个独立的消费组。set('group.id','group_2');$rk=newRdKafka\Consumer($conf);$rk->addBrokers('192.168.20.6:9092,192.168.20.6:9093');$topicConf=newRdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms',2000);//设置偏移量存储方式为'file'//$topicConf->set('offset.store.method','file');//$topicConf->set('offset.store.path',sys_get_temp_dir());//或者,设置偏移存储方法to'broker'$topicConf->set('offset.store.method','broker');//设置在偏移存储中没有初始偏移量或所需偏移量超出范围时从何处开始消费消息.//'smallest':从头开始??$topicConf->set('auto.offset.reset','smallest');$topic=$rk->newTopic($topic,$topicConf);//开始消费分区0$topic->consumeStart($partition,RD_KAFKA_OFFSET_STORED);while(true){$message=$topic->消耗($分区,3*1000);开关($message->err){caseRD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);休息;案例RD_KAFKA_RESP_ERR__PARTITION_EOF:案例RD_KAFKA_RESP_ERR__TIMED_OUT:echo$message->errstr()。PHP_EOL;休息;默认值:抛出新的\Exception($message->errstr(),$message->err);休息;}}