当前位置: 首页 > 科技观察

开源消息中间件RocketMQ详解系列

时间:2023-03-15 18:59:46 科技观察

什么是RocketMQRocketMQ是一个纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批处理消息、定时消息、消息回溯等,主要功能有异步解耦和流量削峰。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、RocketMQ。四种消息中间件基本介绍:特点ActiveMQRabbitMQKafkaRocketMQ单机吞吐量10000级,比RocketMQ和Kafka高10万级,支持10万级高吞吐量。高吞吐量,一般配合大数据系统进行实时数据计算、日志采集等场景中topic个数对吞吐量的影响可达百/千,吞吐量会略有下降。这是RocketMQ的一大优势。同一台机器下,可以支持大量topic主题,从几十到几百,吞吐量会明显下降。同一台机器下,Kafka尽量保证topic数量不会太大。如果要支持大型主题,则需要添加更多的机器资源。第二层,RabbitMQ的特点,ms级别延迟最低,延迟在ms级别以内,高可用,基于主从架构实现高可用,同样ActiveMQ非常高,分布式架构非常高,一份数据分布式多副本,少数机器宕机,不会丢失数据,不会导致消息不可用可靠性数据丢失概率低基本不丢失数据参数优化配置后,可以做到0丢失完全开发基于erlang,并发性强,性能优异,延迟低。MQ功能比较完善,基本是分布式的,扩展性好。功能比较简单。主要支持大数据领域的简单MQ功能,实时计算和日志采集,大规模使用其他Apache开发。起步早,没有在高吞吐场景下验证。社区不活跃。开源、稳定、高活跃。社区活跃度高的消息中间件使用场景:异步解耦:当我们下单的时候,订单系统会进行RPC同步调用支付系统、库存系统、物流系统等,那么它们之间就会产生耦合。系统,耦合度越高,容错性越低。比如我们的支付系统宕机了,就会导致我们整个交易出现异常,影响用户体验。如果我们加上消息中间件,不管是支付系统还是库存系统,都是异步调用的。如果其中一个系统出现故障,不会影响我们用户下订单的使用。MQ本质上第一步是异步,第二步是解耦。那么系统的容错性就更高了。流量调峰:流量调峰也称为削峰填谷。比如一些互联网公司的大促场景,双十一,门店庆典,或者闪购活动,都会用到消息中间件。如果没有消息中间件,没有流量调峰,每秒的并发量是很高的。这时候如果我们的A系统要往我们的MYSQL中写入数据,是受限于MYSQL自身服务的上限。最多,我们每秒只能处理200个请求。这时候就会积累大量的消息,导致系统A崩溃。这时候我们就可以通过MQ把用户的请求消息写出来,因为消息中间件本身就是一个处理数据量比较大的系统,所以对于每秒2000个请求,消息中间件可以处理,然后系统A动作作为一个消息中间件,一个软件的消费者以固定的速度从MQ中拉取200条消息来完成我们的业务操作,以时间换空间来保证我们A系统的稳定性。数据分发:如果S系统在开发系统的时候需要连接多个(A、B、C、D)系统,使用传统的接口调用,如果中间有变化我们就需要修改我们的代码。对于系统A,我们需要修改代码来调用系统A来完成相应的业务逻辑。如果我们当中的系统D需要去掉,我们也需要修改代码,删除相应的接口调用。如果S系统使用了消息中间件,我们的S系统只需要将消息交给MQ,剩下的不管是新建的还是移除的,还是原来的,都只是消息中间件的一个消费者。这个时候,我们会方便数据的分发。比如我们新增一个系统,只需要添加一个MQ消费者,直接从MQ中获取消息即可。当我们需要移除一个系统时,只需要取消对MQ消息的监听即可。我们原来的S系统不需要额外的修改。如果使用MQ作为数据分发,减少了数据修改,提高了开发效率。RocketMQ基本概念RocketMQ主要有四个核心组件:NameServer、Broker、Producer和Consumer。这些角色通常以集群的形式存在。RocketMQ基于纯Java开发,具有高吞吐、高可用等特点,适用于大型分布式系统应用。对于RockerMQ,我们要想启动它,首先要启动NameServer。Brober主机启动后,Brober会向NameServer注册相应的路由和服务(Broker地址,subject和),Producer会发现路由并上报给NameServer请求Broker路由信息发送消息。作为消费者,需要连接NameServer获取相关的路由信息??,方便我们订阅消息。Broker也是一个很重要的角色。它主要负责消息的存储。无论是生产消息还是订阅消息,消息的来源都是Broker。一般来说,消息的发送(Producer)只会发送给master节点,然后由Broker发送消息。Synchronization,同步到从节点,作为消费者(Consumer)只会先从Master节点获取消息,并进行消费,除非主节点不可用或者很忙,才会从从节点消费,Broker除外消息的传递,也负责消息的持久化和主从数据的复制NameServer:NameServer是一个服务和注册发现中心。它也是整个RocketMQ的“大脑”,所以RocketMQ需要先启动NameServer,然后再启动BrokerNameServer,在RocketMQ中是一个几乎无状态的节点,可以部署在集群中,节点之间不需要任何信息同步。NameServer底层是Netty实现的,是内存存储,所以不会持久化NameServer中的broker和topic。NameServer的作用类似于Dubbo和zookeeper,主要负责Broker的动态注册和发现。为什么不使用动物园管理员?Rocketmq主要用在分布式的情况下追求性能,因为zookeeper最追求的是最终一致性,所以在性能上会有一定的折扣。Broker:消息服务器(Broker)是一个消息存储中心。它的主要功能是接收和存储来自Producer的消息,Consumers从这里获取消息。存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。从部署结构图中可以看出有两种类型的Broker,Master和Slave。Master可以写也可以读,Slave不能写只能读。Producer:Producer也称为消息发布者(producer),负责生产消息并发送给Topic。生产者向业务应用系统生成的代理发送消息。RocketMQ提供了多种发送范例:同步、异步和单向。Consumer:也称为消息订阅者,负责接收和消费来自Topic的消息。消费者从经纪人那里提取信息并将其提供给应用程序。收到Master发来的消息,执行完成后,会发消息给Broker确认,就是ACK确认。RocketMQ的基本概念是分组(Group)Group分为Producer和Consumer两部分:Producer:表示发送同类型消息的Producer,通常发送逻辑是一致的。发送普通消息时,用于身份识别,无特殊用途。它主要用于作用于事务消息。当一个事务消息中的一条消息一直处于waiting状态超时,Broker会检查同一个Group下的其他producers来判断这条消息是commit还是rollback。消费者:消费者的分组是很有意义的。消费者是标识一种消费者类型的集合名称。这种类型的Consumer通常会消费一种类型的消息,消费逻辑是一致的。同一个ConsumerGroup下的各个实例会共同消费topic消息,起到负载均衡的作用。消费进度以ConsumerGroup粒度进行管理,不同ConsumerGroup之间的消费进度互不影响。也就是说,如果消息A已经被ConsumerGroup1消费过,那么它就会被ConsumerGroup2消费。主题(Topic)用于区分消息的类型,表示一类消息的逻辑名称,是消息的逻辑管理单元。无论消息是生产还是消费,都需要执行Topic。发件人可以向一个或多个主题发送消息。消息接收者可以订阅一个或多个主题消息。消息队列(MessageQueue)消息队列简称Queue,消息物理管理单元。用于并行发送和接收消息,相当于Topic的分区。一个Topic有多个Queue,消息生产一般比消息消费快。当消息被消费时,会有相应的业务逻辑进行处理,此时消息消费的速度会降低。所有一般主题都会有多个队列。主要用于解决生产快消费慢的问题。如果同一个Topic创建在不同的Broker上,那么不同的Broker有不同的Queue,这些Queue物理上存储在不同的Broker节点上,具有横向扩展的能力。无论是生产者还是消费者,实际操作都是针对Queue层面的。标签(Tag)RocketMQ支持在发送主题消息时为主题消息设置标签,用于区分同一主题下不同类型的消息。对于来自同一业务单元的消息,可以根据不同的业务目的,在同一主题下设置不同的标签。比如一个主题消息是水果,那么这个水果可以有香蕉、西瓜、草莓等其他标签,我们可以给相应的消息打上相应的标签(Tag),方便我们在时进行对应消耗过滤器。标签可以有效的保持代码的清晰度和连贯性,优化RocketMQ提供的查询系统。消费者可以基于Tags对不同的subtopic实现不同的消费逻辑,以达到更好的扩展性。偏移量(Offset)在RocketMQ中,有很多偏移量的概念。一般我们只关心暴露给客户端的offset。如果不指定,一般指消费者消息的偏移量(ConsumerOffset)。消息队列是一个无限长的数组。当消息进来时,下标会加1,这个数组的下标是偏移的。Messagequeue中的maxoffset表示消息的最大偏移量,Consumeroffset可以理解为在一个逻辑MessageQueue上标记ConsumerGroup,消息被消费的地方就是消费进度。RocketMQ下载安装下载地址:https://rocketmq.apache.org/dowloading/releases/。环境要求:Windows/Linux64位系统。JDK1.8(64位)。源代码安装需要安装Maven3.2.x。这里我们使用rocketmq-4.9.2作为演示案例。设置环境变量:变量名:ROCKETMQ_HOME。变量值:MQ解压路径\MQ文件夹名。在rocketmq-4.9.2\bin目录下启动,打开cmd窗口。先启动nameServer,启动命令:startmqnamesrv.cmd。然后启动Broker,启动命令:startmqbroker.cmd-n127.0.0.1:7906autoCreateTopicEnable=true。管理插件安装:旧版本下载地址:https://codeload.github.com/apache/rocketmq-externals/zip/master。新版本地址:https://github.com/apache/rocketmq-dashboard。启动完成后,在浏览器中输入'127.0.0.1:8089',成功后可以在管理端查看。消息发送RocketMQ提供的nativeclientAPI,当然除了nativeclient之外,还集成了SpringBoot和SpringCloudStream,但本质上这些也是基于nativeAPI的封装,所以你只需要掌握native即可API,其他的东西自然就来了。导入MQ客户端依赖org.apache.rocketmqrocketmq-client4.9.2消息发送:/***同步发送*/publicclassSyncProducer{publicstaticvoidmain(String[]args)throwsException{//实例化消息生产者ProducerDefaultMQProducerproducer=newDefaultMQProducer("group_test");//设置NameServer的地址producer.setNamesrvAddr("127.0.0.1:9876");//producer.setSendLatencyFaultEnable(true);//启动Producer实例producer.start();for(inti=0;i<10;i++){//创建消息,并指定主题、标签和消息正文Messagemsg=newMessage("TopicTest"/*Topic*/,"TagA"/*Tag*/,("HelloRocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)/*消息体*/);//向Broker发送消息SendResultsendResult=producer.send(msg);西stem.out.printf("%s%n",sendResult);}//如果不再发送消息,关闭Producer实例producer.shutdown();}}总结本文主要是让大家了解RocketMQ的基本原理和介绍,在后面的章节中,我们将带大家深入了解和使用RocketMQ。

猜你喜欢