首先安装PHP对应的RabbitMQ。这里我们使用php_amqp。不同的扩展实现方式会略有不同。PHP扩展地址:http://pecl.php.net/package/amqp详情以官网为准http://www.rabbitmq.com/getstarted.htmlconfig.php简介配置信息BaseMQ.phpMQ基类ProductMQ.php生产者类ConsumerMQ.php消费者类Consumer2MQ.php消费者2(可以有多个)config.php['host'=>'127.0.0.1','port'=>'5672','login'=>'guest','password'=>'guest','vhost'=>'/',],//switch'exchange'=>'word',//route'routes'=>[],];BaseMQ.phpconf=$conf['host'];$this->exchange=$conf['exchange'];$this->AMQPConnection=new\AMQPConnection($this->conf);if(!$this->AMQPConnection->connect())thrownew\AMQPConnectionException("无法连接到代理!\n");}/***关闭链接*/publicfunctionclose(){$this->AMQPConnection->disconnect();}/**Channel*@return\AMQPChannel*@throws\AMQPConnectionException*/publicfunctionchannel(){if(!$this->AMQPChannel){$this->AMQPChannel=new\AMQPChannel($this->AMQPConnection);}返回$this->AMQPChannel;}/**交换*@return\AMQPExchange*@throws\AMQPConnectionException*@throws\AMQPExchangeException*/publicfunctionexchange(){if(!$this->AMQPExchange){$this->AMQPExchange=new\AMQPExchange($this->通道());$this->AMQPExchange->setName($this->exchange);}返回$this->AMQPExchange;}/**队列*@return\AMQPQueue*@throws\AMQPConnectionException*@throws\AMQPQueueException*/publicfunctionqueue(){if(!$this->AMQPQueue){$this->AMQPQueue=新的\AMQPQueue($this->channel());}返回$this->AMQPQueue;}/**信封*@return\AMQPEnvelope*/publicfunctionenvelope(){if(!$this->AMQPEnvelope){$this->AMQPEnvelope=new\AMQPEnvelope();}返回$this->AMQPEnvelope;}}ProductMQ.phpchannel();//创建一个开关对象$ex=$this->exchange();//消息内容$message='productmessage'.rand(1,99999);//开始交易$channel->startTransaction();$sendEd=true;foreach($this->routesas$route){$sendEd=$ex->publish($message,$route);echo"发送消息:".$sendEd."\n";}if(!$sendEd){$channel->rollbackTransaction();}$channel->commitTransaction();//提交事务$this->close();死;}}try{(newProductMQ())->run();}catch(\Exception$exception){v??ar_dump($exception->getMessage());}ConsumerMQ.phpexchange();$ex->setType(AMQP_EX_TYPE_DIRECT);//直接类型$ex->setFlags(AMQP_DURABLE);//persistence//echo"ExchangeStatus:".$ex->declare()."\n";//创建队列$q=$this->queue();//var_dump($q->declare());exit();$q->setName($this->q_name);$q->setFlags(AMQP_DURABLE);//持久化//echo"消息总数:".$q->declareQueue()."\n";//绑定exchange和queue,指定路由keyecho'QueueBind:'.$q->bind($this->exchange,$this->route)."\n";//阻塞方式接收消息echo"Message:\n";while(True){$q->consume(function($envelope,$queue){$msg=$envelope->getBody();echo$msg."\n";//处理消息$queue->ack($envelope->getDeliveryTag());//手册发送ACK响应});//$q->consume('processMessage',AMQP_AUTOACK);//自动ACK响应}$this->close();}}try{(newConsumerMQ)->run();}catch(\Exception$exception){v??ar_dump($exception->getMessage());}
