当前位置: 首页 > 后端技术 > PHP

简单可靠的rabbitmq组件包-php版

时间:2023-03-30 03:26:31 PHP

当rabbitmq已经在项目中得到广泛应用时,这里简单总结一下rabbitmq的常规功能,并封装成composer包,composer包地址,github地址,欢迎Fork,由于水平有限,难免会有bug。欢迎提供宝贵意见。easy-rabbitmq包是php-amqplib/php-amqplib包的二次包,为常用功能提供了一套开箱即用的生产方案。目前支持的功能列表如下:直连交换机推送消息(含延时消息)扇形交换机推送消息(含延时消息)主题交换机推送消息(含延时消息)订阅模式下可靠消费,后consumer消费失败会尝试继续消费,最多尝试5次。拉取模式下的可靠消费,消费者消费失败后,会尝试继续消费,最多尝试5次。如果还有其他场景,欢迎继续补充,再迭代!!对PHP版本安装包的要求主要取决于php-amqplib/php-amqplib包本身的要求。这里为了兼顾php5.0的用户,我们使用php-amqplib/php-amqplib包V2.9.0版本。具体要求见此处。不过笔者推荐使用php7.0及以上版本,本开发包也是在7.0版本上开发的!安装composer需要maweibinguo/easyrabbitmq才能使用这里推荐php脚本+supervisor的组合,保证消费过程的可靠性,增强worker的消费能力!如果你还没有听说过supervisor,你可以点击这里了解它。1.推送消息1-1。将消息推送到直接交换机$config=["host"=>"127.0.0.1","port"=>"5672","user"=>"guest","password"=>"guest","vhost"=>"/","channel_max_num"=>10,];$instance=RabbitMq::getInstance($config);//延时消息,30秒后到达指定交换机$instance->pushToDirect($msg=time(),//消息体内容$exchange="easy_direct_exchange",//交换机名称$routingKey="direct_test_queue",//消息的routingKey,对bingdingKey的consume(get)方法必须与routingKey保持一致$delaySec=30//延时秒数);//不延迟,推送到指定的直连开关$instance->pushToDirect($msg=time(),//消息体内容$exchange="easy_direct_exchange",//交换名$routingKey="direct_test_queue",//消息的routingKey,consume(get)方法到bingdingKey必须和routingKey一致);1-2、向扇区交换机推送消息$config=["host"=>"127.0.0.1","port"=>"5672","user"=>"guest","password"=>"guest","vhost"=>"/","channel_max_num"=>10,];$instance=RabbitMq::getInstance($config);//延迟消息,30秒后到达指定交换机$instance->pushToFanout($msg=time(),//消息体内容$exchange="easy_fanout_exchange",//交易所名称$delaySec=30//延迟秒);//不延迟,推送到指定的直链交易所$instance->pushToFanout($msg=time(),//消息体内容$exchange="easy_fanout_exchange"//交易所名称);1-3、推送消息到topicexchange$config=["host"=>"127.0.0.1","port"=>"5672","user"=>"guest","password"=>"guest","vhost"=>"/","channel_max_num"=>10,];$实例=RabbitMq::getInstance($config);//延迟消息,30秒后到达指定exchange$instance->pushToTopic($msg=time(),//消息体内容$exchange="easy_topic_exchange",//ExchangeName/***routingKey必须匹配consume(get)方法的bindingKey*bindingKey支持两个特殊字符“*”和“#”进行模糊匹配,其中“*”用于匹配一个词,“#”用于匹配多个词(也可以为0)*无论是bindingKey还是routingKey,每一个独立的字符串,用“.”分隔都是一个词,easy.topic.queue,包含三个词easy,topic,queue*/$routingKey="easy.topic.queue",$delaySec=30//延迟秒数);//不延迟,推送到指定直连开关$instance->pushToTopic($msg=time(),//消息体内容$exchange="easy_topic_exchange",//交换名称$routingKey="easy.topic.queue");2.Consumption消息消费支持自动重试,最多重试5次,每次消费失败后删除消息放回消费队列重新时间会随着失败次数的增加而逐渐过去。本客户端支持的策略如下:1次失败(1秒后重发),2次失败(2秒后重发),失败3次(4秒后重发),失败4次(重发)8秒后投递),失败5次(16秒后会重新投递)2-1、订阅模式订阅模式$config=["host"=>"127.0.0.1","port"=>下可靠消费"5672","user"=>"guest","password"=>"guest","vhost"=>"/","channel_max_num"=>10,];$instance=RabbitMq::getInstance($config);$instance->consume($queueName="direct_test_queue",//订阅队列名$consumerTag="c1",//消费标记$exchange="easy_direct_exchange",//交换名$bindingKey="direct_test_queue",//bindingkey,如果是直链交换,需要和routingKey保持一致$callback=function($msg){$body=$msg->body;file_put_contents("./test.log","time=>"。time()."\t"."body=>".$body.PHP_EOL,FILE_APPEND);//如果返回的结果不是绝对等于(===)true,那么会触发消息重试机制return错误的硒;},//5次消费失败后,发送失败消息的队列名称$failedQueue="easymq@failed");2-2。Pull模式拉模式下的可靠消费$config=["host"=>"127.0.0.1","port"=>"5672","user"=>"guest","password"=>"guest","vhost"=>"/","channel_max_num"=>10,];$instance=RabbitMq::getInstance($config);$instance->get($queue="get_queue",$exchange="easy_fanout_exchange",$bindingKey="",$callback=function($msg){$body=$msg->body;file_put_contents(./test)}.log","time=>".time()."\t"."body=>".$body.PHP_EOL,FILE_APPEND);//如果返回结果不绝对等于(===)true,则触发消息重试机制returnfalse;},//5次消费失败后,失败消息投递到的队列名称$failedQueue='easymq@failed');