kafka简介Kafka是一个高吞吐量的分布式发布订阅消息系统kafka角色必须知道生产者:生产者。消费者:消费者。topic:消息被记录在topic类别中,Kafka对消息种子(Feed)进行分类,每一类消息称为一个主题(Topic)。Broker:以集群模式运行,可以由一个或多个服务组成,每个服务称为一个broker;消费者可以订阅一个或多个主题(topics),并从Broker中拉取数据来消费这些发布的消息。经典模型1.一个topic下的partition数量不能小于消费者数量,即一个topic下的消费者数量不能大于partition数量。如果太大,会浪费空闲时间。2、一个topic下的partition可以同时被不同的consumergroup之一使用。一个consumer消费3个,一个topic下的一个partition只能被同一个consumergroup的一个consumer消费。常用参数说明request.required.acksKafkaproducer的ack有3种机制,初始化producer时的producerconfig可以通过request配置。required.acks不同的值来实现。0:表示producer不等待broker确认同步完成,继续发送下一条(batch)消息。该选项提供了最低的延迟但最弱的持久性保证(当服务器发生故障时会丢失一些数据,例如领导者死了,但生产者不知道,无法接收到代理发送的信息)。1:表示生产者在领导者成功收到数据并收到确认后发送下一条消息。当客户端等待服务器确认请求成功时,此选项提供更好的持久性(写入死领导者但尚未复制的唯一消息将丢失)。-1:表示producer在follower副本确认收到数据后才完成发送。此选项提供最佳持久性,我们保证只要至少有一个同步副本保持活动状态,就不会丢失任何信息。三种机制的性能依次递减(生产者吞吐量递减),数据鲁棒性递增。auto.offset.reset1.earliest:自动将偏移量重置为最早的偏移量2.latest:自动将偏移量重置为最新的偏移量(默认)3.none:如果消费者组没有找到之前的偏移量,则抛出异常给消费者。4.其他参数:向消费者抛出异常(无效参数)kafka安装及简单测试安装kafka(无需安装,解压即可)#官方下载地址:http://kafka.apache.org/downloads#wgethttps//www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgztar-xzfkafka_2.12-1.1.1.tgzcdkafka_2.12-1.1。0启动kafkaserver#需要先启动zookeeper#-daemon可以启动后台守护模式bin/zookeeper-server-start.shconfig/zookeeper.propertiesbin/kafka-server-start.shconfig/server.propertiesstartkafkaclienttest#创建一个topic,测试topic2partitionsbin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions2--topictestCreatedtopic"test".#显示所有topicbin/kafka-topics.sh--list--zookeeperlocalhost:2181test#显示主题信息bin/kafka-topics.sh--describe--zookeeperlocalhost:2181--topictestTopic:testPartitionCount:2ReplicationFactor:1Configs:话题:testPartition:0Leader:0Replicas:0Isr:0Topic:testPartition:1Leader:0Replicas:0Isr:0#启动生产者(输入消息)bin/kafka-console-producer.sh--broker-列出本地主机:9092--topictest[Waitforyourowncontenttoappear>justenter]>iamanewmsg!>iamagoodmsg?#startaconsumer(waitforthemessage)#注意这里的--from-beginning,每次都会从头读取,可以试试去掉或者不看效果bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topictest--from-beginning[Wait对于消息]我是一个新的消息!我是一个很好的消息?安装kafka的php扩展#首先安装rdkfka库文件gitclonehttps://github.com/edenhill/librdkafka.gitcdlibrdkafka/./configuremakesudomakeinstallgitclonehttps://github.com/arnaud-lb/php-rdkafka.gitcdphp-rdkafkaphpize./configuremakeall-j5sudomakeinstallvim[php]/php.iniextension=rdkafka.sophpcodepracticeproducersetDrMsgCb(函数($kafka,$message){file_put_contents("./dr_cb.log",var_export($message,true).PHP_EOL,FILE_APPEND);});$conf->setErrorCb(函数($kafka,$err,$reason){file_put_contents("./err_cb.log",sprintf("Kafka错误:%s(原因:%s)",rd_kafka_err2str($err),$reason).PHP_EOL,FILE_APPEND);});$rk=newRdKafka\Producer($conf);$rk->setLogLevel(LOG_DEBUG);$rk->addBrokers("127.0.0.1");$cf=newRdKafka\TopicConf();//-1必须等待所有broker完成同步确认1currentserverConfirm0不确认,这里如果是0,回调中的offset不会返回,如果是1和-1,会返回offset//我们可以利用这个机制来确认消息的产生,但是它不是100%,因为中途可能有kafka服务器挂了$cf->set('request.required.acks',0);$topic=$rk->newTopic("test",$cf);$option='qkl';for($i=0;$i<20;$i++){//RD_KAFKA_PARTITION_UA自动选择分区//$optionoptional$topic->produce(RD_KAFKA_PARTITION_UA,0,"qkl.$i",$选项);}$len=$rk->getOutQLen();while($len>0){$len=$rk->getOutQLen();变量转储($len);$rk->poll(50);}runproducerphpproducer.php#outputint(20)int(20)int(20)int(20)int(0)#你可以检查上面启动的消费者shell是否应该输出消息qkl。0qkl。1qkl。2qkl。3qkl。4qkl。5qkl。7qkl。8qkl。9qkl。10qkl。11qkl。12qkl。13qkl。14qkl。15qkl。16qkl。17qkl。18qkl。kafka,$message){file_put_contents("./c_dr_cb.log",var_export($message,true),FILE_APPEND);});$conf->setErrorCb(function($kafka,$err,$reason){file_put_contents("./err_cb.log",sprintf("Kafkaerror:%s(reason:%s)",rd_kafka_err2str($err),$reason).PHP_EOL,FILE_APPEND);});//设置消费组$conf->set('group.id','myConsumerGroup');$rk=newRdKafka\Consumer($conf);$rk->addBrokers("127.0.0.1");$topicConf=newRdKafka\TopicConf();$topicConf->set('request.required.acks',1);//interval.ms内自动提交确认,建议不启动//$topicConf->set('auto.commit.enable',1);$topicConf->set('auto.commit.enable',0);$topicConf->set('auto.commit.interval.ms',100);//设置offset存储为文件//$topicConf->set('offset.store.method','file');//设置offset的存储方式为broker$topicConf->set('offset.store.method','broker');//$topicConf->set('offset.store.path',__DIR__);//最小:简单理解为从头开始消费,其实相当于最早的//largest:简单理解为从最新消费,其实相当于最新的//$topicConf->set('auto.offset.reset','最小');$topic=$rk->newTopic("test",$topicConf);//参数1消费partition0//RD_KAFKA_OFFSET_BEGINNING从头开始??消费//RD_KAFKA_OFFSET_STORED上次消费的offset记录开始消费//RD_KAFKA_OFFSET_END上次消费$topic->consumeStart(0,RD_KAFKA_OFFSET_BEGINNING);//$topic->consumeStart(0,RD_KAFKA_OFFSET_END);////$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);while(true){//参数1表示消费分区,这里是分区0//参数2表示同步阻塞多长时间$message=$topic->consume(0,12*1000);如果(is_null($message)){睡眠(1);echo"没有更多信息\n";继续;}switch($message->err){caseRD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);休息;caseRD_KAFKA_RESP_ERR__PARTITION_EOF:echo"没有更多消息;将等待更多\n";休息;案例RD_KAFKA_RESP_ERR__TIMED_OUT:\nbre;默认值:抛出新的\Exception($message->errstr(),$message->err);休息;}}高级消费者assign();//$kafka->assign([newRdKafka\TopicPartition("qkl01",0,0)]);休息;caseRD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo"撤销:";var_dump($分区);$kafka->赋值(NULL);休息;默认值:抛出新的\Exception($err);}}//设置重新平衡回调以记录分区分配(可选)$conf->setRebalanceCb(function(\RdKafka\KafkaConsumer$kafka,$err,array$partitions=null){rebalance($kafka,$err,$partitions);});//配置group.id。所有消费者具有相同的group.id将消耗//不同的分区。$conf->set('group.id','test-110-g100');//Kafka代理的初始列表$conf->set('metadata.broker.list','192.168.216.122');$topicConf=new\RdKafka\TopicConf();$topicConf->set('request.required.acks',-1);//在interval.ms自动提交确认,建议不启动$topicConf->set('auto.commit.enable',0);//$topicConf->set('auto.commit.enable',0);$topicConf->set('auto.commit.interval.ms',100);//设置offset存储为文件$topicConf->set('offset.store.method','file');$topicConf->set('offset.store.path',__DIR__);//设置offset存储为broker//$topicConf->set('offset.store.method','broker');//设置当偏移量存储中没有初始偏移量或所需偏移量超出范围时从何处开始消费消息。//'smallest':从头开始$topicConf->set('auto.offset.reset','smallest');//设置用于订阅/分配主题的配置$conf->setDefaultTopicConf($topicConf);$consumer=new\RdKafka\KafkaConsumer($conf);//$KafkaConsumerTopic=$consumer->newTopic('qkl01',$topicConf);//订阅主题'test'$consumer->subscribe(['qkl01']);echo"等待分区分配...$消费者->消费(120*1000);switch($message->err){caseRD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);//$consumer->commit($message);//$KafkaConsumerTopic->offsetStore(0,20);}休息;caseRD_KAFKA_RESP_ERR__PARTITION_EOF:echo"没有更多消息;将等待更多\n";休息;caseRD_KAFKA_RESP_ERR__TIMED_OUT:echo"超时\n";休息;默认值:抛出新的\Exception($message->errstr(),$message->err);休息;}}消费组特别说明特别注意,HighLEVEL消费者设置的消费组,kafka服务器只会记录LowLevel消费者设置的消费组。服务器不会记录具体的消费组信息。可以阅读本文查看服务器元数据(topic/partition/broker)setDrMsgCb(function($kafka,$message){file_put_contents("./xx.log",var_export($message,true),FILE_APPEND);});$conf->setErrorCb(function($kafka,$err,$reason){printf("Kafka错误:%s(原因:%s)\n",rd_kafka_err2str($err),$reason);});$conf->set('group.id','myConsumerGroup');$rk=newRdKafka\Consumer($conf);$rk->addBrokers("127.0.0.1");$allInfo=$rk->元数据(真,空,60e3);$topics=$allInfo->getTopics();echord_kafka_offset_tail(100);echo"--";echocount($topics);echo"--";foreach($topicsas$topic){$topicName=$topic->获取主题();如果($topicName=="__consumer_offsets"){继续;}$partitions=$topic->getPartitions();foreach($partitionsas$partition){//$rf=newReflectionClass(get_class($partition));//foreach($rf->getMmethods()as$f){//var_dump($f);//}//die();$topPartition=newRdKafka\TopicPartition($topicName,$partition->getId());回声“当前主题:”。($topPartition->getTopic())。“-”。$分区->getId()。“-”;回声“偏移量:”。($topPartition->getOffset())。PHP_EOL;}}如果需要远程生产和消费vimconfig/server.propertiesadvertised.listeners=PLAINTEXT://ip:9092#ip没有你kafka的外网ip,可以分享一个打包好的php-rdkafka类库https://github。com/qkl9527/php-rdkafka-class参考Kafka文档
