当前位置: 首页 > 后端技术 > PHP

PHP实战RabbitMQ生产者确认

时间:2023-03-30 02:08:40 PHP

介绍为了提高系统的高可用,生产者在发送消息时需要通过MQ回复ACK以保证消息入库成功。注:图片来源微信公众号:架构师之路RabbitMQ官网有PublisherConfirms的相关介绍,可惜没有php版的示例代码,而php-amqplib也是一群志愿者维护的,精力有限,没有完全使用examplephp-Amqblib也有开发者提出了Issues,希望完善examplehttps://github.com/php-amqpli...本文将使用php实现PublisherConfirms。希望对大家有所帮助。代码已经提交到github:https://github.com/jiaoyang3/...transaction事务的几种类型可以保证RabbitMQ的原子操作,使用起来也很简单。$channel->tx_select();//begintrx$channel->tx_commit();//committrx$channel->tx_rollback();//回滚测试例子,运行后发现没有消息提交到queuegetChannel();$channel->tx_select();//开始传输$queueName='test-single-queue2';$rabbit->createQueue($queueName,false,true,false,false);for($i=0;$i<10000;$i++){$rabbit->sendMessage($i."这是一条测试消息。",$queueName,'',['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT//消息持久化,重启rabbitmq,消息不会丢失]);如果($i==10){thrownewException('rollbock');}}$channel->tx_commit();//committrxunset($rabbit);//close}catch(Exception$e){$channel->tx_rollback();//回滚echo$e->getMessage();}每次单个ack生产者发送一条消息,MQ服务会持久化保存输出数据,然后回复ack/nack消息给producer打开confirm$channel->confirm_select();注册ack/nack回调方法//ack回调函数$channel->set_ack_handler(function(AMQPMessage$message){echo'ack'.$message->getBody().PHP_EOL;});//nack回调函数$channel->set_nack_handler(function(AMQPMessage$message){echo'ack'.$message->getBody().PHP_EOL;});setack/nacktimeout$channel->wait_for_pending_acks_returns(5);//设置等待时间测试代码如下,完整代码可以在github上查阅getChannel();$channel->confirm_select();//开启confirm//ack回调函数$channel->set_ack_handler(function(AMQPMessage$message){echo'ack'.$message->getBody().PHP_EOL;});//nack回调函数$channel->set_nack_handler(function(AMQPMessage$message){echo'ack'.$message->getBody().PHP_EOL;});$queueName='test-single-queue1';$兔子->createQueue($queueName,false,true,false,false);for($i=0;$i<10000;$i++){$message=$i.“这是一条测试消息。”;$rabbit->sendMessage($message,$queueName,'',['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);回声$消息。'已发送'。PHP_EOL;$channel->wait_for_pending_acks_returns(5);//设置等待时间sleep(1);}unset($rabbit);//关闭连接多次acks,每条消息acks性能很低这里,进行batchacks,每50messages设置为集中acks,代码也很简单$channel=$rabbit->getChannel();$channel->confirm_select();//开启confirm//ack回调函数$channel->set_ack_handler(function(AMQPMessage$message){echo'ack'.$message->getBody().PHP_EOL;});//nack回调函数$channel->set_nack_handler(function(AMQPMessage$message){echo'ack'.$message->getBody().PHP_EOL;});$队列Name='test-single-queue';$rabbit->createQueue($queueName,false,true,false,false);//每次ack性能都很低,批量ack,每50条消息设置ack$batchSize=50;$outstandingMessageCount=0;for($i=0;$i<10000;$i++){$message=$i.“这是一条测试消息。”;$rabbit->sendMessage($message,$queueName,'',['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT//消息持久化,重启rabbitmq,消息不会丢失]);回声$消息。'发送'。PHP_EOL;如果(++$outstandingMessageCount==$batchSize){echo'------';$channel->wait_for_pending_acks_returns(5);$outstandingMessageCount=0;}sleep(1);}if($outstandingMessageCount>0){$channel->wait_for_pending_acks_returns(5);}unset($rabbit);//关闭连接异步ack毫无疑问,异步的效率是最高的,可惜php-amqplib没有实现,一种语言用的人越多,生态越好,php不再回当年

最新推荐
猜你喜欢