当前位置: 首页 > 科技观察

我和同事曾经聊过“架构设计”,来听听吧...

时间:2023-03-18 02:27:39 科技观察

1.上篇文章《??百亿级流量的系统架构该怎么设计,今天就来教会你!??》分析了如何使用消息中间件来解耦系统处理。同时我们也提到了使用消息中间件还有利于一条数据同时被多个系统订阅,可以被多个系统用于不同的用途。当前架构如下图所示。在这张图中,我们可以清楚的看到,实时计算平台发布的一条数据被发送到消息中间件,然后会进行以下步骤:数据查询平台会订阅这条数据,落入在自己的本地数据库集群和缓存集群中,对外提供数据查询的服务数据质量监控系统会按照一定的业务规则监控计算结果。如果发现任何数据计算错误,会立即报警数据链路跟踪系统,数据链路跟踪系统会收集计算结果作为链路节点,同时收集一条数据的整个完整计算链路,并且一系列数据计算环节在地面汇集存储。最后,如果某个数据计算错误,可以立即通过计算环节。执行回溯以解决问题。因此,在上述场景中,使用消息中间件可以解耦,其次可以实现消息“Pub/Sub”模型,实现消息的发布和订阅。这篇文章我们就来看看,如果使用RabbitMQ作为消息中间件,如何实现一个数据被多个系统同时订阅的“Pub/Sub”模型。2、基于消息中间件的队列消费模型上图其实就是RabbitMQ最基本的队列消费模型的支持。也就是说,你可以理解为RabbitMQ内部有一个队列,生产者不断向队列发送数据,消息按顺序进入队列。接下来假设队列中有4条数据,那么我们有2个消费者一起消费这个队列的数据。这时候每个消费者会平均分到2条数据,也就是说4条数据会平均分到每个消费者。每个消费者只处理一部分数据。这是一个典型的队列消费模型。.前面几篇基本给出了上面提到的最基本的队列消费模型的RabbitMQ代码实现,如何保证消费者宕机时数据不丢失,如何让RabbitMQ集群同时持久化队列和消息。基本上整体代码实现比较完整,大家可以参考一下。3.基于消息中间件的“Pub/Sub”模型但是消息中间件也可以实现一个“Pub/Sub”模型,即“发布/订阅”模型,Pub就是发布,Sub就是订阅。该模型可以支持多个系统同时消费一条数据。也就是说,你发布的每一条数据都会广播到每一个系统。下面放一张图大家一起来感受一下。如图所示也就是说,我们要实现上图中的效果,实时计算平台向消息中间件发布一系列数据。那么数据查询平台、数据质量监控系统、数据链路跟踪系统都会订阅数据,消费相同的完整数据,各个系统可以根据自己的需要使用数据。这就是所谓的“Pub/Sub”模型。一个系统发布一条数据,多个系统订阅消费完全相同的一条数据。那么如果想要实现以上效果,基于RabbitMQ应该如何处理呢?4、RabbitMQ中的交换是什么?实际上,在RabbitMQ中,生产者是不允许直接向一个队列(queue)投递消息的,而只允许生产者向RabbitMQ内部一个叫做“exchange”的特殊组件投递消息。关于这个exchange,你大概可以把这个组件理解为一个消息路由组件。也就是说,实时计算平台发送给RabbitMQ的消息都是由一个exchange接收的。然后交换机会根据一定的规则决定将消息路由转发到哪个队列。这实际上是RabbitMQ中的一个核心消息模型。看下图一起理解吧。5、默认的exchange在上一篇文章中,我们在向RabbitMQ投递消息的时候,并没有使用任何exchange,但是为什么还是将消息投递到了队列中呢?那是因为我们使用的是默认的exchange,它会直接将消息路由到你指定的queue中,那么如果单纯的使用queue消费模型,exchange的概念是不是就省略了呢?上面的段落是我们之前向您展示的一种传递消息的方法,可以使消息持久化。请注意,里面的第一个参数是一个空字符串。这个空字符串的意思是把消息投递到默认的exchange,然后它会把消息路由到我们指定的queue。6.将消息投递到fanoutexchange在RabbitMQ中,exchange组件有很多类型,例如direct、topic、headers和fanout。这里我们就来看看最后一个,fanout类型的exchange组件。这种exchange组件其实很简单,可以创建一个fanout类型的exchange,然后在这个exchange上绑定多个队列。那么只要你向这个exchange发布消息,他就会将消息路由到他绑定的所有队列。使用以下代码创建交换。比如在实时计算平台(producer)的代码中,可以添加如下一段,创建一个fanout类型的exchange。我们称第一个参数为“rt_compute_data”,这是交易所的名称,rt是“RealTime”的缩写,意思是实时计算系统的计算结果数据。第二个参数将交换类型定义为“fanout”。channel.exchangeDeclare("rt_compute_data","fanout");然后我们使用下面的代码将消息投递到我们创建的exchange组件:你会注意到此时消息投递到了指定的exchange,但是路由到哪个queue呢?在这一点上,我们还没有决定,我们要让消费者把自己的队列绑定到这个exchange上。7、将自己的queue绑定到exchange上消费。我们还修改了消费者的代码。在此之前,我们这里关闭了autoAck机制,然后每次都手动ack。在上面的代码中,每个消费者系统都会有一些差异,即每个消费者都需要定义自己的队列,然后绑定到交换器上。例如:数据查询平台的队列是“rt_compute_data_query”数据质量监控平台的队列是“rt_compute_data_monitor”数据链路跟踪系统的队列是“rt_compute_data_link”这样,每个订阅这个数据的系统实际上有自己的队列,然后队列会被交易所路由到实时计算平台产生的所有数据中。并且因为多队列的模式,每个系统都可以部署消费者集群进行数据消费和处理,非常方便。八、整体结构图最后给大家看一张大图,我们就按照图来勾勒一下整个流程。如上图所示,首先,实时计算平台会将消息投递到“rt_compute_data”的“exchange”,但是他并没有指定exchange会将消息路由到哪个队列,因为他不知道这个.然后数据查询平台、数据质量监控系统、数据链路跟踪系统可以声明自己的队列并绑定到交易所。因为queue和exchange的绑定是由订阅数据的平台指定的。并且因为这个exchange是fanout类型的,所以只要接收到数据,它就会把数据路由到它绑定的所有queue中,这样每个queue都有相同的数据供对应的平台消费。并且对于每个平台自己的队列,也可以部署一个消费服务集群来消费自己的一个队列。自己队列中的数据还是会平均分配给各个消费者服务实例去处理,每个消费者服务实例都会得到一部分数据。大家想一想,这是否实现了不同系统订阅一条数据的“Pub/Sub”模型?当然,其实RabbitMQ也支持各种类型的交换,可以实现各种复杂的功能。后续我们将通过实际的线上系统架构案例来讲解消息中间件技术的各种用法。

猜你喜欢