当前位置: 首页 > 科技观察

下一代消息队列Pulsar到底是什么?

时间:2023-03-15 16:52:28 科技观察

本文转载自微信公众号“咖啡拿铁”,作者咖啡拿铁。转载请联系CaféLatte公众号。想了很久的背景,一直想写一篇pulsar相关的文章,但是知识储备不够,很多细节还是不懂,所以查了很多资料,并且终于可以补一篇文章了。Pulsar是一个消息中间件,2016年被雅虎开源,2018年成为Apache的顶级项目。在我之前的文章中,写过很多其他消息中间件的文章,比如kafka,rocketmq等。开源界已经有这么多的消息队列中间件了。Pulsar作为新势力有哪些优势?Pulsar从诞生之日起就被拿来与其他消息队列(kafka、rocketmq等)进行比较,但是Pulsar的设计思想与大多数消息队列中间件不同。它具有高吞吐量、低延迟、计算与存储分离、多租户、远程复制等功能,因此Pulsar也被称为下一代消息队列中间件。接下来,我将一一详细分析。Pulsar架构原理整体架构与其他消息队列中间件没有太大区别。相信大家已经见过很多耳熟能详的名词了。接下来,我将对这些术语的含义一一进行解释。名词解释生产者:消息生产者将消息发送给代理。Consumer:消息消费者从Broker读取消息到客户端进行消费处理。Broker:可以看作是pulsar的server,Producer和Consumer都看作是client。pulsar的消息处理节点,Broker不同于其他消息中间件,它是无状态的,没有存储,所以可以无限扩容,后面会详细说明。Bookie:负责所有消息的持久化,这里使用ApacheBookkeeper。ZK:和kafka一样,pulsar也使用zk来保存一些元数据,比如配置管理、topic分配、租户等。ServiceDiscovery:可以理解为Pulsar中的nginx。整个broker只用一个url就可以搞定,当然你也可以用自己的服务发现。客户端读取、更新或删除主题的初始请求将被发送到可能不处理该主题的代理。如果此代理无法处理主题请求,代理会将请求重定向到可以处理主题请求的代理。无论是kafka、rocketmq还是我们的pulsar,作为一个消息队列中间件,最重要的大概分为三个部分:Producer如何生产消息,发送给对应的Broker,Broker如何处理消息,以及高效持久化和查询Consumer是如何消费消息的,这三个部分我们后面会讲解。Producer生产消息我们简单看一下如何用代码发送消息:PulsarClientclient=PulsarClient.create("pulsar://pulsar.us-west.example.com:6650");Producerproducer=client.createProducer("persistent://sample/standalone/ns1/my-topic");//Publish10messagestothetopicfor(inti=0;i<10;i++){producer.send("my-message".getBytes());}第一步:首先使用我们的url创建客户端。这个url就是我们服务发现的地址。如果我们使用单机模式,我们可以直接连接。Step2:我们传入一个类似于url的参数,我们只需要通过这个来指定我们在哪个topic或者namespace下创建即可:Compositionmeaningpersistent/non-persistentPulsar提供了两种topic:persistent和non-persistent。如果选择非持久主题,则所有消息都将存储在内存中。如果代理重新启动,所有消息都将丢失。如果选择持久化topic,所有消息都会持久化到磁盘,重启broker,消息就可以正常消费了。Tenant,顾名思义,就是租户。Pulsar最初用作雅虎内部整个公司使用的中间件。需要为主题指定一些层次,租户是其中的一层。比如这个可以是一个大的部门,比如电商中台租户。namespace命名空间可以看做是二级,比如电商平台topic下的订单业务群消息队列名称Step3:调用send方法发送消息。这里还提供了sendAsync方法,支持异步发送。以上三个步骤中,步骤1和步骤2属于我们的准备阶段,用于构建客户端和构建生产者。我们真正的核心逻辑在send中,所以这里提几个小问题。你可以想想其他的消息队列是怎么做的,然后和pulsar对比一下:我们调用send之后会不会马上发送?如果有多个分区,我如何找出我应该发送给哪个Broker?上面我们说的发送模式就是发送点,有async和sync两种模式,但其实async模式在pulsar内部sync模式中也是用的。在sync模式下,模拟回调阻塞,达到同步的效果。这种模式在kafka中也是采用的,但是在rocketmq中,所有发送都是真正同步的,会直接请求给broker。基于这种模式,在pulsar和kafka中都支持批量发送,在rocketmq中支持直接发送。批量发送有什么好处?当我们发送的TPS特别高的时候,如果我们每次发送都直接连接到broker,它可能会做很多重复性的工作,比如压缩,鉴权,创建链接等等。例如,如果我们发送1,000条消息,我们可能会做1,000次这种重复性工作。如果是分批发送,则1000条消息合并为一个请求。相对来说,压缩和鉴权工作只需要做一次。可能有同学会问,批量发送会不会导致发送时间有一定的延迟?其实这个大可不必担心。在pulsar中,batch默认每1ms发送一次,或者默认batchsize达到1000时发送。这个发送的频率还是很快的。发送负载均衡通常会将主题在消息队列中横向扩展。在pulsar和kafka中称为partition,在rocketmq中称为queue。它本质上是一个分区。我们可以在不同的broker上放置不同的分区来达到我们的水平。扩展效果。我们在发送的时候,可以自己制定选择分区的策略,或者默认使用它来训练分区策略。我们选择分区后,如何确定哪个分区对应哪个broker呢?大家可以先看下图:Step1:我们所有的information分区映射信息都保存在zk和broker缓存中。Step2:通过查询broker,获取partition与broker的关系,并定时更新。Step3:在pulsar中,每个partition在发送端被抽象成一个单独的Producer。这一点不同于Kafka和rocketmq。在Kafka中,选择一个partition后,去partition对应的broker地址,然后发送。Pulsar将每个分区封装到一个Producer中。在代码实现上,不用关注它对应的是哪个broker。所有的逻辑都在producer的代码里,整体来说比较干净。压缩消息消息压缩是优化信息传输的手段之一。我们通常会看到一些大文件会以压缩包的形式下载。我们也可以在我们的消息队列中使用这个想法。我们会发送一批消息,比如1000个条目可能传输大小为1M,但压缩后可能只有几十kb,这样增加了我们和broker之间的传输效率,但同时我们的CPU也会带来损失。Pulsar客户端支持多种压缩类型,如lz4、zlib、zstd、snappy等。client.newProducer().topic("test-topic").compressionType(CompressionType.LZ4).create();Broker接下来说说第二个重要的部分Broker,在Broker设计中pulsar和其他的消息队列都有很大的不同,也正是因为这种不同,才成为了他的特点。计算与存储分离首先说说它最大的特点:计算与存储分离。我们一开始就说Pulsar是下一代消息队列,这跟它的架构设计有很大关系。不管是Kafka还是RocketMQ,所有的计算和存储都放在同一台机器上。这种模式有几个缺点:扩容困难:当我们需要对集群进行扩容时,我们通常会受到cpu或磁盘其中之一的影响,但我们必须申请一台可能cpu和磁盘配置都不错的机器,造成浪费的资源。而且kafka的扩容还需要迁移数据,过程非常复杂。负载不均衡:当某些分区数据特别大时,会导致broker负载不均衡,如下图,如果某个分区数据特别大,会导致某个broker(ship)承载的数据过多,但另一个经纪人可能比较闲。pulsar计算分离架构可以很好的解决这个问题:对于计算:也就是我们的broker,提供消息队列的读写,不存储任何数据,对我们无状态的扩展非常友好,只要你机器够多,随便上。Broker扩容往往适合增加消费者的吞吐量。当我们有一些流量大的业务或活动时,比如电商促销,我们可以提前扩展经纪商。对于存储:就是我们的bookie,它只为消息队列提供存储。如果对消息量有要求,我们可以扩展bookie,不需要迁移数据,扩展很方便。消息存储名词解析:上图是bookie的读写架构图。其中有一些名词需要先介绍一下:Entry,Entry是bookkeeper中存储的一条记录,包括EntryID,记录实体等。ledger,可以认为ledger是用来存储Entry的,多个Entry序列形成分类帐。Journal,其实就是bookkeeper的WAL(writeaheadlog),用来存放bookkeeper的事务日志。日志文件有一个最大大小,达到这个大小后会创建一个新的日志文件。Entrylog,存放Entry的文件。分类帐是一个逻辑概念。entry会先被ledger聚合,然后写入entrylogfile。同样,入口日志也会有一个最大值。达到最大值后,将创建一个新的入口日志文件索引文件。分类账的索引文件,分类账中的条目写入条目日志文件,索引文件用于条目日志文件中的每个分类帐作为索引,记录每个分类帐在日志中的存储位置入口日志和入口日志文件中数据的长度。MetaDataStorage,元数据存储,用于存储bookie相关的元数据,比如哪些账本在bookie上,而bookkeeper目前使用的是zk存储,所以在部署bookkeeper之前,首先要有一个zk集群。整体架构的写入流程:Step1:broker发起写入请求,首先将WAL写入Journal磁盘。熟悉mysql的朋友都知道redolog、journal和redolog是用来恢复非持久化数据的。Step2:然后将数据写入索引和账本。这里为了保持性能,不会直接写入磁盘,而是写入pagecache,然后异步刷盘。第三步:确认写入。读的过程是:Step1:先读索引,当然是先读缓存,再去磁盘。Step2:获取索引后,如何根据索引高效的读写entrylogger中相应的数据?当我们在Kafka中的topic比较多的时候,因为Kafka有一个topic和一个文件,我们的磁盘IO就会乱序写入,变成随机写入。在rocketMq中,虽然将多个topic写入一个文件一一对应,这样写就变成了顺序写,但是我们的读很容易导致我们的pagecache被各种overlay刷新,对我们的IO影响很大。因此,pulsar在读写两方面针对这些问题做了很多优化:写流程:顺序写+pagecache。在写的过程中,我们所有的文件都是独立的磁盘,只有Journal是同步刷写的。Journal顺序写一个journal-wal文件,顺序写效率很高。虽然ledger和index都会有多个文件,但我们只会写入pagecache并异步刷盘,所以随机写入不会影响我们的性能。读取过程:broker缓存+bookie缓存。在pulsar中,对tailingread非常友好,基本不走io。一般我们的consumer会马上拿到producer发来的消息,所以这部分是persistent之后,在broker中还是作为缓存存在的。当然,即使broker没有缓存(比如broker是新建的),我们的bookie也会在memtable中有自己的缓存,通过多个缓存减少读取过程的io。我们可以发现读写的io在最好的情况下是完全隔离的,所以在Pulsar中支持百万主题很容易,但是在我们的kafka和rocketmq中就很难了。无限流存储一个topic其实就是一个账本流(Segment)。通过这样的设计,Pulsar不是一个纯粹的消息队列系统,而是可以替代流式系统,所以又被称为流式原生平台,可以替代flink等系统。可以看到我们的EventStream(topic/partition)是由多个Segment存储组成的,每个segment由一个entry组成。这可以看作是我们发送的每批消息的一个条目。段可以看作是我们写文件的一个基本维度。同一个segment的数据会写在同一个文件中,不同的segment会是不同的文件,segment之间的数据会保存在metadata中。kafka和rocketmq中消息的分层存储会有一定的存储时间,因为磁盘会有空间限制,pulsar中也提供了这个功能,但是如果你想让你的消息永久存储,那么可以使用分层存储,我们可以定期将一些较旧的数据刷新到便宜的存储,例如s3,然后我们可以无限地存储我们的消息队列。pulsar中的数据复制与kafka和rocketmq有很大不同。在其他消息队列中,其他副本通常是主动同步的。通常,这个时间变得不可预测。在pulsar中,采用了类似的qurom协议。给出一组可用的bookie池,然后并发写入其中的一部分,只要返回部分成功(通常大于1/2)即可。EnsembleSize(E)决定了可用于给定账本的bookie池的大小。WriteQuorumSize(Qw)指定Pulsar写入条目的bookie数量。AckQuorumSize(Qa)指定必须确认写入的bookie的数量。使用这种并发写入的方式会让数据复制更加高效,尤其是在数据副本很多的时候。Consumer接下来说说Consumer,pulsar中最后一个重要的组件。订阅模式订阅模式用于定义我们的消息如何分发给不同的消费者。不同的消息队列中间件有自己的订阅模式。一般我们常见的订阅模式有:集群模式:一条消息只能分布在一个集群中被消费者消费。广播模式:一条消息可以被集群中的所有消费者消费。pulsar提供独占、容灾、共享、密钥共享四种订阅模式:独占:顾名思义,只有一个消费者可以独占。如果同一个集群中有第二个消费者注册,第二个会失败,适用于全局有序的消息。容灾:增强版独享,独享一个挂了会自动切换到另一个好的消费者,但独占只能有一个。共享模式:这种模式看起来有点像集群模式。一条消息只能被集群中的一个消费者消费。但是和rocketmq不同的是,rocketmq使用partition维度,同一个Partition的数据会发送到一台机器上。.Pulsar中的消费不走分区维度,而是轮流训练所有消费者发送消息。这样做有什么好处?如果你有100台机器,但是你只有10个分区,你只有10个消费者可以运行,但是在pulsar中,所有100台机器都可以处理消费。keysharing:类似于上面说的分区维度发送,在rocketmq中,相同key的顺序消息会发送到一个分区,但是这里不会有分区维度,只是根据的hash分发给固定的消费者钥匙。解决消费者能力受分区数量限制的问题。消息获取方式无论是在kafka还是rocketmq中,我们都是使用客户端定时训练我们的broker获取消息。这种模式称为长轮询模式。这种方式的缺点是网络开销比较大。我们来计算消费者的消费延迟。我们假设broker和consumer之间的网络延迟是R,那么我们的总时间是:当某条消息A刚刚给broker的时候,此时long-polling只是打包数据返回,而broker的时间返回给消费者的是R,消费者再次发送请求,也是R。将我们的消息A返回给消费者,这里又是R。如果只考虑网络延迟,可以看出我们消息的消费延迟大约是3R,所以我们必须想办法优化一下,有的同学可能马上就想到了,我们的消息就直接推送给我们了consumer是不对的,现在我们的延迟只会是一次R,这是我们常见的push方式,但是单纯的push方式是有问题的,如果我们的生产速度远远大于消费速度,那么push的消息肯定会把我们的炸掉内存,这就是背压。那么我们如何解决背压呢?我们可以优化push方式,变成动态推送。我们结合Long-polling,在长轮询请求时,通知BrokerBuffer剩余空间,由Broker负责推送数据。此时Broker知道最多可以推送多少条数据,所以可以控制推送行为,以免压垮Consumer。例如:当Consumer发起请求时,Buffer剩余容量为100,Broker每次最多返回32条消息。然后Consumer的长轮询请求Broker会执行3次push(共96条消息),然后将response返回给Consumer。(响应包含4条消息)。如果采用长轮询模型,每次Consumer向Broker发送请求执行响应,本例需要4次长轮询交互(共4次请求4次响应,8次网络操作;1次在DynamicPush中/Pull)请求,三推一响应,共5次网络操作)。因此,pulsar采用这种消息获取方式,进一步优化了消息从消费者层到达的时间。我觉得这个设计很巧妙,很多中间件的long-polling模式都可以参考这个思路进行改进。综上所述,ApachePulsar的很多设计思想都不同于其他中间件,但无疑更接近未来。我大胆预测,未来其他一些消息中间件的发展也会向它靠拢。目前,Pulsar在中国的用户越来越多。腾讯云提供了TDMQ,pulsar的云版本。当然,华为、知乎、虎牙等其他一些知名公司也在逐步尝试。我相信pulsar真的是一种趋势。最后也让我想起了大江大河大结局中的一句话:所有的改变都可能伴随着痛苦和弯路。任何危险的浅滩和礁石都可以被封锁。道在何处,纵有万人,我也必去。