Runtime::enableCoroutine(false);//mgo(function(){//创建记录器//$logger=newLogger('my_logger');////现在添加一些处理程序//$logger->pushHandler(newStdoutHandler());////$logger->pushHandler(new\Monolog\Handler\NullHandler());//\Amp\Loop::set(new\Swoole\Driver\Amp());$config=ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(500);$config->setMetadataBrokerList('192.168.3.243:9092');$config->setGroupId('swoole');$config->setBrokerVersion('1.0.0');$config->setTopics(['save_user_travel_data_one']);$config->setOffsetReset('earliest');$consumer=newConsumer();//$consumer->setLogger($logger);$consumer->start(function($topic,$part,$message):void{mgo(function()使用($message){$data=json_decode(json_decode($message['message']['value'],真),真);$clearing_start_time=$data['clearing_start_time'];//清仓时间$user_id=$data['user_id'];//清仓用户id$total=$data['total'];//本次清仓用户总数Console::Debug('开始任务。'.$user_id);$this->saveUserTravel($user_id,$clearing_start_time);Redis::incr('save_user_travel_log:'.$clearing_start_time);Console::Debug('完成章节'.Redis::get('save_user_travel_log:'.$clearing_start_time));if(Redis::get('save_user_travel_log:'.$clearing_start_time)>=$total){//doConsole::Debug('完成所有任务');}},false);});\Swoole\Event::wait();经测该方法会造成内存溢出!!!,使用rdkafka扩展不会
