当前位置: 首页 > 科技观察

如何优雅地使用RabbitMQ_0

时间:2023-03-20 21:04:53 科技观察

RabbitMQ无疑是目前最流行的消息队列之一,同时也支持各种语言环境。作为.NET开发人员,学习和理解这个工具是很有必要的。消息队列大致有三种使用场景:1.系统集成和分布式系统设计。各种子系统通过消息连接起来,这种解决方案逐渐发展成为一种架构风格,即“通过消息传递的架构”。2.当系统中的同步处理方式严重影响吞吐量时,比如日志记录。如果需要在系统中记录所有的用户行为日志,如果采用同步的方式记录日志,势必会影响系统的响应速度。当我们将日志消息发送到消息队列时,日志子系统会以异步的方式消费日志信息。3、系统的高可用,比如电商秒杀场景。当应用服务器或数据库服务器在某个时刻收到大量请求时,系统就会宕机。如果能够将请求转发到消息队列,然后服务器就可以消费这些消息,请求就会变得顺畅,系统的可用性也会提高。1.开始使用RabbitMQRabbitMQ官网提供了详细的安装步骤。此外,官网还提供了六种场景使用RabbitMQ的教程。其中教程1、3、6会覆盖99%的使用场景,所以一般情况下你只需要搞清楚这3个教程就可以快速上手。2.简单分析我们以官方教程1做一个简单的总结:本教程展示了Producer如何向消息队列发送消息,消息消费者收到消息后如何消费消息。信息。1、生产者端:varfactory=newConnectionFactory(){HostName="localhost"};using(varconnection=factory.CreateConnection()){while(Console.ReadLine()!=null){using(varchannel=connection.CreateModel())){//创建一个名为“hello”的消息队列channel.QueueDeclare(queue:"hello",durable:false,exclusive:false,autoDelete:false,arguments:null);varmessage="HelloWorld!";varbody=Encoding.UTF8.GetBytes(message);//发送消息到消息队列messagechannel.BasicPublish(exchange:"",routingKey:"hello",basicProperties:null,body:body);Console.WriteLine("[x]Sent{0}",message);}}}这段代码很简单,几乎到了无法简化的地步:创建通道(channel)->创建队列->向队列发送消息。2.消费者端:varfactory=newConnectionFactory(){HostName="localhost"};using(varconnection=factory.CreateConnection()){using(varchannel=connection.CreateModel()){//创建一个“hello”Queue,防止生产者不创建队列channel.QueueDeclare(queue:"hello",durable:false,exclusive:false,autoDelete:false,arguments:null);//回调,当消费者收到消息后,会执行函数varconsumer=newEventingBasicConsumer(channel);consumer.Received+=(model,ea)=>{varbody=ea.Body;varmessage=Encoding.UTF8.GetString(body);Console.WriteLine("[x]Received{0}",message);};//消费队列"hello"channel.BasicConsume中的消息(queue:"hello",noAck:true,consumer:consumer);Console.WriteLine("Press[enter]toexit.");控制台。ReadLine();}}这段代码可以理解为:创建通道->创建队列->定义回调函数->消费消息。本例描述的是Send/Receive模式,可以简单理解为1(生产者)VS1(消费者)的场景;例3描述的是Publish/Subscriber模式,即1(生产者)VS多(消费者);在这个例子中,生产者只需要发送消息,并不关心消费者返回的结果。示例6描述了一个RPC调用场景。生产者发送消息后,还需要接收消费者的返回结果。这种场景似乎有点违背使用消息队列的目的。因为使用消息队列的目的之一就是为了异步,但是这个场景好像把异步变成了同步,但是这个场景也很好用,比如用户操作产生消息,应用服务收到消息后执行message添加了一些逻辑,数据库发生了变化,UI会等待应用服务的返回结果,然后再刷新页面。3.我在桌上找到了一份RabbitMQinAction。另外官网提供的文档也很详细。感觉一个月内就可以熟练掌握RabbitMQ了。那个时候,我可以在简历上写上“精通……”。有点自鸣得意......,但我知道这不是使用RabbitMQ的最佳方式。我们知道合理的抽象可以帮助我们隐藏一些技术细节,让我们专注于核心业务。比如,如果有人问你:“大雁塔怎么去?”你的回答可能是“小寨往东,直走两站,在右边”,如果你回答:“右转45度,向前走100米,然后转90度……”,对方就会迷路在这些细节中。消息队列的使用其实隐藏了一个抽象——ServiceBus。让我们回顾一下第一个例子。本例隐含业务为:ClientA发送指令,ClientB收到指令后响应。如果是这样,我们为什么要关心如何创建通道,如何创建队列?我只是想发个信息。另外,这个例子不够健壮:没有重试机制:如果ClientB第一次执行失败,如何处理消息?没有错误处理机制:如果重试N次后ClientB仍然异常,如何处理消息?没有熔断机制;如何为ClientA制定时间表,比如定期发送等;没有消息审计机制;无法跟踪消息的各种状态;事务处理等,服务总线就是这个场景的抽象,为我们提供了这些机制。让我们来看看吧。4.认识MassTransitMassTransit是.NET平台下的开源免费ESB产品。官网:http://masstransit-project.com/,GitHub700stars,500Fork,同类产品还有NServiceBus,之所以选择MassTransit是因为它比NServiceBus更轻量。另外,MassTransit开发之初选择了RabbitMQ作为消息传输组件。同时想和NServiceBus对比一下,看看他们的侧重点是什么。1.创建一个新的控制台应用程序:Masstransit.RabbitMQ.GreetingClient。使用MassTransit,您可以从Nuget安装它:Install-PackageMassTransit.RabbitMQ2。创建服务总线并发送命令:staticvoidMain(string[]args){Console.WriteLine("Press'Enter'tosendamessage.Toexit,Ctrl+C");varbus=BusCreator.CreateBus();varsendToUri=newUri($“{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}”);while(Console.ReadLine()!=null){Task.Run(()=>SendCommand(bus,sendToUri)).Wait();}Console.ReadLine();}privatestaticasyncvoidSendCommand(IBusControlbus,UrisendToUri){varendPoint=awaitbus.GetSendEndpoint(sendToUri);varcommand=newGreetingCommand(){Id=Guid.NewGuid(),DateTime=DateTime.Now};awaitendPoint.Send(command);控制台。WriteLine($"sendcommand:id={command.Id},{command.DateTime}");}这段代码隐藏了很多关于消息队列的细节,将我们的注意力集中在发送消息上。同时,ServiceBus提供的API更贴近业务。虽然我们发送的是一条消息,但是在这个场景中体现为一个命令。,Send(command)API描述了我们的意图。3、服务端收到这条命令,创建命令控制台控制程序:Masstransit.RabbitMQ.GreetingServer:varbus=BusCreator.CreateBus((cfg,host)=>{cfg.ReceiveEndpoint(host,RabbitMqConstants.GreetingQueue,e=>{e.Consumer();});});这段代码可以理解为服务端正在监听消息。我们在服务器上注册了一个名为“GreetingConsumer”的消费者。GreetingConsumer的定义:publicclassGreetingConsumer:IConsumer{publicasyncTaskConsume(ConsumeContextcontext){awaitConsole.Out.WriteLineAsync($"receivegreetingcommmand:{context.Message.Id},{context.Message.DateTime}");}消费者可以消费GreetingCommand信息。这个例子几乎隐藏了关于RabbitMQ的技术细节,将代码中心放在业务中,并尝试运行这两个控制台应用程序:5.实现Publish/Subscribe模式Publish/Subscribe模式使得基于消息的软件架构成为可能,而这种能力表现为ClientA发送消息X,ClientB和ClientC都可以订阅消息X。1、我们修改一下上面的例子。当GreetingConsumer收到GreetingCommand时,发送GreetingEvent:vargreetingEvent=newGreetingEvent(){Id=context.Message.Id,DateTime=DateTime.Now};awaitcontext.Publish(greetingEvent);2、新建控制台程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA订阅GreetingEvent消息:varbus=BusCreator.CreateBus((cfg,host)=>{cfg.ReceiveEndpoint(host,RabbitMqConstants.GreetingEventSubscriberAQueue,e=>{e.Consumer();});});bus.Start();定义GreetingEventConsumer:publicclassGreetingEventConsumer:IConsumer{publicasyncTaskConsume(ConsumeContextconsume){awaitConsole.Assole.Ascrite($"receivegreetingevent:id{context.Message.Id}");}}此代码几乎与接受命令的Masstransit.RabbitMQ.GreetingServer完全相同。),将命令直接发送到端点。服务器侦听自己的端点并使用命令。在Publish/Subscribe模式下,Client发布一个事件,SubscriberA在自己的端点(endpointA)监听事件,SubscriberB在自己的端点(endpointB)监听事件。3.根据上面的分析,定义一个Masstransit.RabbitMQ.GreetingEvent.SubscriberB4。运行4个控制台应用程序看看。6.实现RPC模式。这种模式在Masstransit中称为请求/响应模式。通过IRequestClient接口实现相关操作。官方github上有一个相关的例子。结语:本文分析了如何使用Masstransit来抽象服务,避免直接使用特定的消息队列。当然还有本文提到的很多服务总线机制,比如“重试、熔断等”。未在本文中出现,有待进一步研究。了解该项目。通过对Masstransit和NServiceBus的一些试用对比,Masstransit在实际项目中使用方便且免费,各种API的定义也很清晰,但官方文档有点过于简单,需要深入研究在实际使用中。作为.NET平台下为数不多的ESB开源产品之一,它的关注度还是不够,期待大家为开源项目贡献力量。作者:RichieZhang来源:http://www.cnblogs.com/richieyang/