介绍RabbitMQ是一个高可用的信息中间件,学习和使用RabbitMQ是非常有必要的。异步消息支持java、python、php等多种开发语言,可插拔的认证授权,可以使用RabbitMQ-Manager进行管理和监控。这里安装直接使用docker,安装拉取镜像非常方便dockerpullrabbitmq:3.8.3-management-alpinerundockerrun--namerun-rabbitmq-d-p15672:15672-p5672:5672rabbitmq15672port为RabbitMQWeb管理页面,直接访问:http://localhost:15672/,初始用户密码:guest使用RabbitMQ作为生产者和消费者时,基本有2种场景:一个/多个生产者和多个共享消费或者一个/多个生产者,多个独立消费者共享的消费者可以同时消费一个队列的数据,增加吞吐量自己的队列。以下代码将用于实现各种场景的基本概念。queue数据队列,数据可以推送到队列中,也可以从队列中消费。exchange交换机向交换机推送数据,队列可以绑定交换机,不同类型交换机支持不同的绑定规则fanout没有规则,所有exchange中直接的数据完全匹配,只绑定指定routingkey值的数据topic更灵活的规则,routingkeyroutingkey必须用a隔开。开放词,*(星号)用于表示一个词,#(井号)用于表示任意数字(零个或多个)词封装RabbitMQ的一些常用操作connection=newAMQPStreamConnection($this->host,$this->port,$this->user,$this->password);$this->channel=$this->connection->channel();}/***@param$exchangeName*@param$type*@param$pasive*@param$durable*@param$autoDelete*/publicfunctioncreateExchange($exchangeName,$type,$pasive=false,$durable=false,$autoDelete=false){$this->channel->exchange_declare($exchangeName,$type,$pasive,$durable,$autoDelete);}/***@param$queueName*@param$pasive*@param$durable*@param$exlusive*@param$autoDelete*/publicfunctioncreateQueue($queueName,$pasive=false,$durable=false,$exlusive=false,$autoDelete=false,$nowait=false,$arguments=[]){$this->channel->queue_declare($queueName,$pasive,$durable,$exlusive,$autoDelete,$nowa它,$arguments);}/***生成消息*@param$message*/publicfunctionsendMessage($message,$routeKey,$exchange='',$properties=[]){$data=newAMQPMessage($message,$properties);$this->channel->basic_publish($data,$exchange,$routeKey);}/***消费消息*@param$queueName*@param$callback*@throws\ErrorException*/publicfunctionconsumeMessage($queueName,$callback,$tag='',$noLocal=false,$noAck=false,$exclusive=false,$noWait=false){$this->channel->basic_consume($queueName,$tag,$noLocal,$noAck,$exclusive,$noWait,$callback);while($this->channel->is_consuming()){$this->channel->wait();}}}/***@throws\Exception*/publicfunction__destruct(){$this->channel->close();$this->connection->close();}}多个共享消费者多个消费者可以提高消费速度,提供小二的系统吞吐量,直接在codeproducer代码上();$queueName='test-single-queue';$rabbit->createQueue($queueName,false,true,false,false);for($i=0;$i<10000;$i++){$rabbit->sendMessage($i."thisisatestmessage.",$queueName,'',['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT//消息持久化,重启rabbitmq,消息不会丢失]);}unset($rabbit);//关闭连接,运行producerphpProducer,在manager网页可以看到队列信息消费者代码body);//打印消息sleep(2);//处理耗时任务$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);//ack};$rabbit->consumeMessage($queueName,$callback);unset($rabbit);//关闭连接并运行消费者secondaryphpConsumer.php可以看到两个消费者不会重复消费消息。您还可以看到该队列的消息正在通过managerweb进行消费。多个独立的消费者。RabbitMQ生产者将消息推送到交换器。多个队列绑定一个exchange,实现多个独立的consumer定义一个topic类型的switch。消费规则为:test.ex。加一个词createQueue($queueName,false,true);//绑定开关$rabbit->bindQueue($queueName,$exchangeName,$routingKey);//消费$callback=function($message){var_dump("ReceivedMessage:".$message->body);//打印消息sleep(2);//处理耗时任务$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);//ack};$rabbit->consumeMessage($queueName,$callback);unset($rabbit);//关闭连接并启动消费者phpConsumer.php定义生产者,将推送消息createExchange($exchangeName,AMQPExchangeType::TOPIC,false,true,false);//向exchange推送10000条数据,routingkey=test-ex-queue1for($i=0;$i<10000;$i++){$rabbit->sendMessage($i."thisisaqueue1message.",$routingKey1,$exchangeName,['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT//消息持久化,重启rabbitmq,消息不会丢失]);}//向交换机推送10000条数据routingkey=test-ex-queue2for($i=0;$i<10000;$i++){$rabbit->sendMessage($i."thisisaqueue2message.",$routingKey2,$exchangeName,['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT//消息持久化,重启rabbitmq,消息不会丢失]);}unset($兔子);//关闭连接,运行生产者phpProducer.php,可以看到消费者有20000条消息要消费,包括2个routingkeys中的数据代码代码:https://github.com/jiaoyang3/...
