当前位置: 首页 > 后端技术 > PHP

RabbitMQ+PHP教程6(RPC)

时间:2023-03-29 20:54:59 PHP

(使用php-amqplib)前提本教程假设RabbitMQ已安装并运行在标准端口(5672)。如果您使用不同的主机、端口或凭据,则需要调整您的连接设置。如果您在学习本教程时遇到困难,可以通过邮件列表联系我们。入门在第二个教程中,我们学习了如何使用工作队列将耗时任务分配给多个工作人员。但是如果我们需要在远程计算机上运行一个函数并等待结果怎么办?好吧,那是另一个故事。这种模式通常称为远程过程调用或RPC。在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分配的耗时任务,因此我们将创建一个返回斐波那契数列的模拟RPC服务。客户端接口为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法,该方法发送RPC请求并阻塞直到收到结果:$fibonacci_rpc=newFibonacciRpcClient();$response=$fibonacci_rpc->call(30);echo"[.]Got",$response,"\n";RPC的一些建议尽管RPC是计算中非常常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的,或者它是一个缓慢的RPC时,就会出现问题。这种混淆会导致不可预测的系统,并给调试增加不必要的复杂性。在简化软件的同时,误用会导致难以维护的RPC代码。考虑到这一点,请考虑以下建议:确保很明显哪个函数调用是本地的,哪个是远程的。记录系统。明确组件之间的依赖关系。处理错误情况。RPC服务器长时间宕机,客户端应该如何响应?如有疑问,请避免使用RPC。如果可以,您应该使用异步管道,而不是像阻塞那样的RPC,将结果异步推送到下一阶段的计算。回调队列(Callbackqueue)在RabbitMQ中一般很容易做RPC。客户端发送请求消息,服务器回复响应消息。为了收到响应,我们需要向请求发送一个“回调”队列地址。我们可以使用默认队列。让我们试试看:list($queue_name,,)=$channel->queue_declare("",false,false,true,false);$msg=newAMQPMessage($payload,array('reply_to'=>$queue_name));$channel->basic_publish($msg,'','rpc_queue');#...然后代码从中读取响应消息callback_queue...消息属性AMQP协议(??0-9-1协议)预定义从一组14个属性中,转到一条消息。大多数属性很少使用,除了以下:delivery_mode:将消息标记为持久的。(值为2)或瞬态(1)。您可能还记得第二个教程中的这个属性。content_type:用于描述编码的MIME类型。例如,对于常用的JSON编码,最好将此属性设置为application/JSON。reply_to:回调队列的通用名称。correlation_id:帮助将RPC响应与请求相关联。CorrelationId在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有更好的方法——让我们为每个客户端创建一个回调队列。这就产生了一个新的问题,response是在queue中收到的,并不清楚response是属于哪个request的。那就是使用correlation_id属性的时候。我们会将其设置为每个请求的唯一值。稍后,当我们在回调队列中收到消息时,我们将查看此属性,并基于此,我们将能够将响应与请求相匹配。如果我们看到一个未知的correlation_id值,我们可以安全地忽略该信息——它不属于我们的请求。您可能会问,为什么我们要忽略回调队列中的未知消息而不是因错误而失败?这是由于服务器端可能存在竞争条件。尽管不太可能,但RPC服务器有可能在发送应答之后、但在发送请求的确认消息之前死亡。如果发生这种情况,重新启动的RPC服务器将重新处理请求。这就是为什么在客户端我们必须优雅地处理重复的响应,并且理想情况下RPC应该是幂等的。总之,我们的RPC将像这样工作:当客户端启动时,它会创建一个匿名的独占回调队列。一次RPC请求,客户端发送一条消息,两个属性:reply_to,设置回调队列,以及correlation_id,为每个请求设置唯一值。请求被发送到rpc_queue队列。RPC工作人员(又名:服务器)正在等待此队列上的请求。当发出请求时,它会完成自己的工作并使用来自reply_to队列的结果将消息发送回客户端。客户端等待回调队列中的数据。当消息出现时,它会检查correlation_id属性。如果它与请求的值匹配,则向应用程序返回一个响应。斐波那契递归源码总结:functionfib($n){if($n==0)return0;如果($n==1)返回1;returnfib($n-1)+fib($n-2);}我们声明斐波那契(Fibonacci)函数。它仅假定有效的正整数输入。(不要期望这个适用于大量数据,它可能是最慢的递归实现)。我们的RPC服务器rpc_server.php代码看起来像这样:channel();$channel->queue_declare('rpc_queue',false,false,false,false);functionfib($n){如果($n==0)返回0;如果($n==1)返回1;returnfib($n-1)+fib($n-2);}echo"[x]等待RPC请求\n";$callback=function($req){$n=intval($req->body);echo"[.]fib(",$n,")\n";$msg=newAMQPMessage((string)fib($n),array('correlation_id'=>$req->get('correlation_id')));$req->delivery_info['channel']->basic_publish($msg,'',$req->get('reply_to'));$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);};$channel->basic_qos(null,1,null);$channel->basic_consume('rpc_queue','',false,false,false,false,$callback);while(count($channel->callbacks)){$channel->wait();}$channel->close();$connection->close();?>服务器代码相当简单:像往常一样,我们从建立连接、通道和声明队列开始我们可能需要运行多个服务器进程。为了将负载分散到许多服务器需要设置prefetch_count,设置$channel.basic_qos$。我们使用basic_consume访问队列。然后我们进入一个while循环,等待请求消息,完成工作并发送响应。我们rpc_client.phpRPC客户端代码:connection=newAMQPStreamConnection('localhost',5672,'guest','guest');$this->channel=$this->connection->channel();列表($this->callback_queue,,)=$this->channel->queue_declare("",false,false,true,false);$this->channel->basic_consume($this->callback_queue,'',false,false,false,false,数组($this,'on_response'));}publicfunctionon_response($rep){if($rep->get('correlation_id')==$this->corr_id){$this->response=$rep->body;}}publicfunctioncall($n){$this->response=null;$this->corr_id=uniqid();$msg=newAMQPMessage((string)$n,array('correlation_id'=>$this->corr_id,'reply_to'=>$this->callback_queue));$this->channel->basic_publish($msg,'','rpc_queue');while(!$this->response){$this->channel->wait();}returnintval($this->response);}};$fibonacci_rpc=newFibonacciRpcClient();$response=$fibonacci_rpc->call(30);echo"[.]Got",$response,"\n";?>现在是获取全部数据的好时机示例源代码rpc_client.php和rpc_server.php我们的RPC服务现已准备就绪。我们可以启动服务器:phprpc_server.php#=>[x]AwaitingRPCrequestsRequestingFibonaccinumbers运行客户端:phprpc_client.php#=>[x]Requestingfib(30)这里呈现的设计不是RPC唯一服务的实现,但它有一些要点:如果RPC服务器太慢,您可以通过运行另一台服务器来扩展。尝试在新控制台中再次运行第一个:rpc_server.php。在客户端,RPC只需要发送和接收消息。不喜欢queue_declare需要同步调用。因此,对于RPC请求,RPC客户端只需要一次网络往返。我们的代码仍然非常简单,并没有试图解决更复杂(但重要)的问题,例如:如果没有服务器在运行,客户端应该如何反应?客户端是否应该为RPC设置某种超时?如果服务器出现故障并抛出异常,是否应该转发给客户端?在处理之前防止无效的传入消息(例如检查边界、类型)。如果您想进行试验,您可能会发现管理UI对于查看队列很有用。