发现网上没有更详细的RabbitMQPHP客户端的使用文档(当然也可能是我不精通usingsearchengines?)总之,一个简单的总结看完这篇文章,附上参数和常用的工作队列,死信队列,以及不同类型交换机的例子。毕竟消息队列中间件在日志采集/异步任务/程序解耦/流量填峰方面做得非常好所以,任何语言的后端都应该掌握最低限度的使用;本人水平有限,难免有错误,欢迎指正~服务器环境Ubuntu18.04.5LTSPHP7.2.24RabbitMQ3.6.10php-amqplib2.71。安装(方便补上安装过程。。。友好)apt-getinstallerlang-noxapt-getinstallrabbitmq-serverrabbitmqctladd_useradminadminrabbitmqctlset_user_tagsadminadministratorrabbitmqctlset_permissions-p/admin'.*''.*''.*'打开web管理页面,cd到安装目录我这里是/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10cd/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10rabbitmq-pluginsenablerabbitmq_managementcd到项目目录(PHP语言)composerrequirephp-amqplib/php-amqplib在需要用到的地方(当然在制作过程中最好抽象出一个Lib单例出来)usePhpAmqpLib\Connection\AMQPStreamConnection;usePhpAmqpLib\Message\AMQPMessage;使用PhpAmqpLib\Wire\AMQPTable;2.各方法参数2.1建立连接$conn=newAMQPStreamConnection($host,//RabbitMQ服务器主机IP地址$port,//RabbitMQ服务器端口$user,//连接RabbitMQ服务器用户名$password,//连接RabbitMQ服务器的用户密码$vhost,//连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似于nginx的vhost)$insist=false,$login_method='AMQPLAIN',$login_response=null,$locale='en_US',$connection_timeout=3.0,//连接超时$read_write_timeout=3.0,//读写超时$context=null,//context$keepalive=false,//Consumers是否需要$heartbeat=0开启长连接常驻进程,//心跳检测间隔单位秒0不检测,根据情况填写$channel_rpc_timeout=0.0,$ssl_protocol=null;2.2建立通道$channel=$conn->channel($channel_id);参数:$channel_id频道id,不传则获取$channel[""]频道,没有则循环$this->channel数组,从1到最大频道数下标,找到第一个就是不是一个AMQPChannel对象的下标,实例化并返回AMQPChannel对象,如果没有异常就会抛出异常。Nofreechannelids2.3在声明开关的实际制作过程中,这是一个很低频的行为,不应该使用,因为你直接在web后台手动添加$channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete);参数:$exhcange_name交换器名称$type交换器类型$passive是否检测同名队列$durable交换器是否启用持久化$auto_detlete通道关闭后是否删除队列(1)交换器类型枚举[direct:(default)directexchange,工作方式类似于unicast,Exchange会将消息发送到完全匹配ROUTING_KEY的Queue,fanout:broadcast是exchangetor,不管消息的ROUTING_KEY设置是什么,Exchange会将消息转发给所有绑定的Queue,topic:topicswitcher,工作方式类似于多播,Exchange会将消息转发给所有与ROUTING_KEY匹配模式相同的队列,例如ROUTING_KEY是user.stock消息将被转发到绑定匹配模式为*.stock,user.stock,*的队列。*和#.user.stock.#(*表是匹配任意词组,#表示匹配0个或多个词组),headers:根据消息体的头部进行匹配]2.4申报队列在实际生产过程中,这是一个很低频的行为,不应使用。因为你在web后台手动添加更方便$channel->queue_declare($queue_name,$passive,$durable,$exclusive,$auto_delete);参数:$queue_name队列名$passive是否检测同名队列$durable是否启用队列持久化$exclusive队列是否可以被其他队列访问$auto_delete通道关闭后是否删除队列属性设置通过Array,比如设置消息持久化['delivery_mode'=>2]//单次发送$channel->basic_publish($msg,$exchange='',$routing_key='',$mandatory=false,$immediate=假的,$ticket=null);参数:$msg消息内容$exchangeexchange$routing_keyrouting_key$mandatory队列无法匹配时是否立即丢弃消息$immediate当队列没有消费者时,是否立即丢弃消息$ticket这个不知道怎么弄等待老板//多次发送1.调用$channel->batch_basic_publish($msg,$exchange='',$routing_key='',$mandatory=false,$immediate=false,$ticketmultipletimes=null)内部实现:插入$this->batch_messages[]2.再次调用$channel->publish_batch(),完成发送2.6队列绑定路由在实际生产过程中,这是一个很低频的行为,不应该使用,因为你直接在web后台手动绑定更方便$channel->queue_bind($queue,$exchange,$routing_key='',$nowait=false,$arguments=array(),$ticket=null)参数:$queue队列名$exchange交换名$routing_keyrouting_key$nowait不知道$arguments$ticket2.7消费消息$channel->basic_consume($queue='',$consumer_tag='',$no_local=false,$no_ack=false,$exclusive=false,$nowait=false,$callback=null,$ticket=null,$arguments=array())参数:$queue队列名$consumer_tag$no_local$no_ack是否不需要手动ack:true表示不需要ack|false需要手动ack$exclusive$nowait$callback消息回调函数$ticket$arguments2.8手动确认示例$callback=function($msg){sleep($msg->body);echo"[x]Receivedsleep",$msg->body,"\n";$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);echo"[x]Ack"."\n";};2.9限制分发示例限制RabbitMQ在处理消息时向同一个消费者发送不超过1条消息,有了反馈,会进行第二次发送。$channel->basic_qos(null,1,null);3.常用场景3.1不带交换器直接排队。这里也是最基本的使用。连接参数直接写成demo~publicfunctionsend(){$connection=newAMQPStreamConnection('localhost',5672,'guest','guest');$channel=$connection->channel();$channel->queue_declare('你好',false,false,false,false);$msg=newAMQPMessage('HelloWorld!');$channel->basic_publish($msg,'','你好');echo"[x]Sent'HelloWorld!'\n";$通道->关闭();$连接->关闭();}publicfunctionconsume(){$connection=newAMQPStreamConnection('localhost',5672,'guest','guest');$channel=$connection->channel();$channel->queue_declare('你好',false,false,false,false);echo'[*]等待消息。要退出,请按CTRL+C',"\n";$callback=function($msg){echo"[x]Received",$msg->body,"\n&qu其他;;};$channel->basic_consume('hello','',false,true,false,false,$callback);while(count($channel->callbacks)){$channel->wait();}}3.2工作队列按照消费容量分配,消费端可以添加$channel->basic_qos(null,1,null);3.3Fanout广播示例注册行为例如注册后需要发送欢迎短信和邮件,将注册行为广播给短信和邮件生产者//定义交换$channel->exchange_declare('register','fanout',false,false,false);$msg=newAMQPMessage('注册事件');$channel->basic_publish($msg,'注册');注册短信消费者$channel->exchange_declare('register','fanout',false,false,false);$channel->queue_declare('register.sms',false,false,false,false);$channel->queue_bind('register.sms','register');注册邮件消费者$channel->exchange_declare('register','fanout',false,false,false);$channel->queue_declare('register.mail',false,false,false,false);$channel->queue_bind('register.mail','注册');3.4主题类型模糊匹配实例日志分类比如我想让一个消费者接受所有的日志,一个消费者只接受Error级别的日志。Producer//定义交换$channel->exchange_declare('log','topic',false,false,false);$num=rand(0,10);if($num%3==0){$level='error';}elseif($num%3==1){$level='warning';}else{$level='common';}$msg=newAMQPMessage('logevent'.$level);$channel->basic_publish($msg,'log','log.'.$level);全日志消费者$channel->exchange_declare('log','topic',false,false,false);$channel->queue_declare('log.all',false,false,false,false);$channel->queue_bind('log.all','log','log.*');错误日志消费者$channel->exchange_declare('log','topic',false,false,false);$channel->queue_declare('log.error',false,false,false,false);$channel->queue_bind('log.error','log','log.error');3.5headers类型匹配实例日志分类例如我想让一个消费者接受所有的日志,而一个消费者只接受Error级别的日志生产者//定义exchange$channel->exchange_declare('log2','headers',false,假,假);$num=rand(0,10);if($num%3==0){$level='error';}elseif($num%3==1){$level='警告';}else{$level='common';}$msg=newAMQPMessage('log2event'.$level);$bindArguments=['level'=>$level,'type'=>'log'];$headers=newAMQPTable($bindArguments);$msg->set('application_headers',$bindArguments);$channel->basic_publish($msg,'log2');完整日志消费者$channel->exchange_declare('log2','headers',假的,假的,假的);$channel->queue_declare('log2.all',false,false,false,false);$bindArguments=['type'=>'log',//'x-match'=>'any'//默认任意];$headers=newAMQPTable($bindArguments);$channel->queue_bind('log2.all','log2','',false,$headers);错误日志消费$channel->exchange_declare('log2','headers',false,false,false);$channel->queue_declare('log2.error',false,false,false,false);$bindArguments=['type'=>'log','level'=>'error','x-match'=>'all'//默认任意];$headers=newAMQPTable($bindArguments);$channel->queue_bind('log2.error','log2','',false,$headers);3.6死信队列//2.5.1定义一个没有消费者且消息在5s后过期的队列//Producer$arguments=newAMQPTable(['x-dead-letter-exchange'=>'dead','x-message-ttl'=>5000,//消息生命周期毫秒'x-dead-letter-routing-key'=>'dead']);//定义队列不交换$channel->queue_declare('no_consume',false,假的,假的,假的,假的,$arguments);$现在=时间();$msg=newAMQPMessage($now);$channel->basic_publish($msg,'','no_consume');echo"[x]已发送no_consume:".date('Y-m-dH:i:s',$now)."\n";$channel->close();$connection->close();//消费者$channel->exchange_declare('dead','topic',false,false,false);$channel->queue_declare('dead.all',false,false,false,false);$channel->queue_bind('dead.all','dead','dead');$channel->basic_qos(null,1,null);echo'[*]等待消息。要退出,请按CTRL+C',"\n";$callback=function($msg){var_dump('msg:'.date('Y-m-dH:i:s',$msg->body));var_dump('now:'.date('Y-m-dH:i:s'));echo"[x]收到日志错误",$msg->body,"\n";};$channel->basic_consume('dead.all','',false,true,false,false,$callback);while(count($channel->callbacks)){$channel->wait();}
