Pulsar是雅虎2016年开源的消息中间件,2018年成为Apache的顶级项目。图片来自Pexels。开源界已经有这么多的消息队列中间件了。Pulsar作为新势力有哪些优势?Pulsar从诞生之日起就不断被拿来与其他消息队列(Kafka、RocketMQ等)进行比较。但Pulsar的设计思想不同于大多数消息队列中间件,具有高吞吐、低延迟、计算与存储分离、多租户、远程复制等功能。因此,Pulsar也被称为下一代消息队列中间件,下面我将一一详细分析。Pulsar架构原理Pulsar架构原理如下:整体架构与其他消息队列中间件没有太大区别。相信大家已经见过很多耳熟能详的名词了。接下来,我将对这些术语的含义一一进行解释。术语解释:Producer:消息生产者,向Broker发送消息。Consumer:消息消费者,从Broker读取消息到客户端进行消费处理。Broker:可以看作是Pulsar的服务器,Producer和Consumer都看作是客户端消息处理的节点。Pulsar的Broker不同于其他消息中间件。它是无状态的,没有存储,因此可以无限扩展。这将在后面详细解释。Bookie:负责所有消息的持久化,这里使用ApacheBookkeeper。ZK:和Kafka一样,Pulsar也使用ZK来保存一些元数据,比如配置管理、主题分配、租户等等。ServiceDiscovery:可以理解为Pulsar中的Nginx,只用一个URL就可以处理整个Broker。当然,你也可以使用自己的服务发现。客户端读取、更新或删除主题的初始请求将被发送到可能不处理该主题的代理。如果Broker无法处理主题请求,Broker会将请求重定向到可以处理主题请求的Broker。无论是Kafka、RocketMQ还是我们的Pulsar,作为消息队列中间件最重要的大概分为三个部分:Producer如何生产消息,并发送给对应的Broker。Broker如何处理消息,并高效的持久化和查询。Consumer就是如何消费消息。我们稍后会解释这三个部分。Producer生产消息我们简单看一下如何用代码发送消息:PulsarClientclient=PulsarClient.create("pulsar://pulsar.us-west.example.com:6650");Producerproducer=client.createProducer("persistent://sample/standalone/ns1/my-topic");//Publish10messagestothetopicfor(inti=0;i<10;i++){producer.send("my-message".getBytes());}第一步:首先使用我们的URL创建客户端。这个URL就是我们ServiceDiscovery的地址。如果我们使用单机模式,我们可以直接连接。Step2:我们传入了一个类似URL的参数。我们只需要传递这个来指定我们在哪个主题或命名空间下创建。URL格式为:{persistent|non-persistent}://tenant/namespace/topicStep3:调用Send方法发送消息,这里还提供了sendAsync方法支持异步发送。以上三个步骤中,步骤1和步骤2属于我们的准备阶段,用于构建客户端和构建Producer。我们真正的核心逻辑在Send中。先在这里问几个小问题。大家可以想想在其他消息队列中是怎么做的,然后和Pulsar对比一下:我们调用Send之后是不是马上就发送了?如果有多个分区,怎么办?找到我应该发送给哪个经纪人?发送模式我们上面说了Send分为Async和Sync两种模式,但实际上Pulsar中的Sync模式也是Async模式,在Sync模式下模拟回调阻塞,达到同步的效果。这种模式在Kafka中也是采用的,但是在RocketMQ中,所有的Send都是真正同步的,会直接向Broker请求。基于这种模式,Pulsar和Kafka都支持批量发送,RocketMQ支持直接发送。批量发送有什么好处?当我们发送的TPS特别高的时候,如果我们每次发送都直接连接到Broker,它可能会做很多重复性的工作,比如压缩,鉴权,建立链接等等。例如,如果我们发送1,000条消息,我们可能会做1,000次这种重复性工作。如果是分批发送的话,1000条消息会合并成一个请求,比较压缩,认证工作只需要做一次。可能有同学会问,批量发送会不会导致发送时间有一定的延迟?其实这个大可不必担心。在Pulsar中,batch默认每1ms发送一次,或者默认batchsize达到1000时发送。这个发送的频率还是很快的。发送负载均衡通常会将主题在消息队列中横向扩展。在Pulsar和Kafka中称为Partition,在RocketMQ中称为Queue。它本质上是一个分区。我们可以把不同的partition放在不同的Broker上,达到我们的层级。扩展效果。我们在发送的时候,可以自己制定选择Partition的策略,或者默认使用它来训练Partition策略。我们选择Partition后,如何确定哪个Partition对应哪个Broker呢?大家可以先看下图:Step1:我们所有的information分区映射信息都保存在ZK和Broker的缓存中。Step2:通过查询Broker,获取分区与Broker的关系,并定时更新。Step3:在Pulsar中,每个partition在发送端被抽象成一个单独的Producer,这一点不同于Kafka和RocketMQ。在Kafka中,大概是选择了一个Partition之后,然后找到这个Partition对应的Broker地址,然后发送。Pulsar将每个Partition封装成一个Producer。在代码实现上,不用关注它对应的是哪个Broker。所有的逻辑都在Producer代码里,整体来说比较干净。压缩消息压缩消息是优化信息传输的手段之一。我们通常会看到一些大文件会以压缩包的形式下载。我们也可以在我们的消息队列中使用这个想法。我们会有一个Batch消息,比如1000条消息,可能传输大小是1M,但是压缩之后可能只有几十KB,这就增加了我们和Broker的关系。传输效率,但同时也给我们的CPU带来了损耗。Pulsar客户端支持lz4、zlib、zstd、snappy等多种压缩类型client.newProducer().topic(“test-topic”).compressionType(CompressionType.LZ4).create();Broker接下来说说第二个重要的部分,Broker。在Broker的设计上,Pulsar和其他所有的消息队列都有很大的不同,也正是因为这种不同,才成为了他的特点。计算与存储分离首先说说它最大的特点:计算与存储分离。我们一开始就说Pulsar是下一代消息队列,这跟它的架构设计有很大关系。不管是Kafka还是RocketMQ,所有的计算和存储都放在同一台机器上。这种模式有几个缺点:扩容困难:当我们需要扩容集群时,我们通常会受到CPU或磁盘其中之一的影响,但我们必须申请一台可能CPU和磁盘配置都不错的机器,造成浪费资源。而且Kafka的扩容还需要进行数据迁移,这是一个非常复杂的过程。负载不均衡:当某些Partition数据特别大时,会导致Broker负载不均衡,如下图,如果一个Partition数据特别大,会导致一个Broker(ship)承载的数据过多,而另一个Broker可能比较闲。Pulsar的计算分离架构可以很好的解决这个问题:对于计算:也就是我们的Broker,它提供消息队列的读写,不存储任何数据,是无状态的。对我们的扩展非常友好。只要你有足够的机器,你就可以轻松访问它。Broker扩容往往适合增加消费者的吞吐量。当我们有一些流量大的业务或活动时,比如电商促销,可以提前进行经纪商扩容。对于存储:也就是我们的Bookie,它只提供消息队列的存储。如果对消息量有要求,我们可以扩展Bookie,不需要迁移数据,扩展很方便。消息存储名词解析:上图是Bookie读写架构图,需要先介绍一些名词:Entry:是存储在bookkeeper中的一条记录,包括EntryID,记录实体等。Ledger:它可以认为ledger是用来存放Entry的,多个Entry序列组成一个ledger。Journal:其实就是bookkeeper的WAL(writeaheadlog),用来存放bookkeeper的事务日志。日志文件有一个最大大小,达到这个大小后会创建一个新的日志文件。Entry日志:存储Entry的文件。分类帐是一个逻辑概念。entry会先被ledger聚合,然后写入entrylogfile。同样,entrylog也会有一个最大值,达到最大值后会创建一个新的entrylog文件。索引文件:账本索引文件,将账本中的分录写入分录日志文件,索引文件用于对分录日志文件中的每一个账本进行索引,记录分录日志中每一个账本的存储位置和数据在日志文件的条目长度中。MetaDataStorage:元数据存储用于存储bookie相关的元数据,比如哪些账本在bookie上,而bookkeeper目前使用的是zk存储,所以在部署bookkeeper之前,首先要有一个zk集群。整体架构上的写入流程:Step1:Broker发起写入请求,首先将WAL写入Journal磁盘。熟悉MySQL的朋友都知道,redolog、journal、redolog都是用来恢复未持久化数据的。Step2:然后将数据写入索引和账本。这里为了保持性能,不会直接写入磁盘,而是写入pagecache,然后异步刷盘。第三步:确认写入。读取过程如下:Step1:先读取索引,当然也先读取缓存,再到磁盘。Step2:得到索引后,去入口记录器根据索引得到对应的数据。如何高效读写?当我们在Kafka中的topic比较多的时候,因为Kafka是一个topic和一个文件,我们的磁盘IO就会从顺序写变成随机写。在RocketMQ中,虽然将多个topic写入到一个文件中,使得写入变成了顺序写入,但是我们的读取很容易导致我们的Pagecache被各种overlay刷新,对我们的IO影响很大。因此,Pulsar在读写两方面针对这些问题做了很多优化:写流程:顺序写+Pagecache。在写的过程中,我们所有的文件都是独立的磁盘,只同步Journal。journal顺序写入一个journal-wal文件,顺序写入效率很高。虽然ledger和index中都有多个文件,但我们只是异步写入Pagecache和刷盘,所以随机写入不会影响我们的性能。读取流程:brokercache+bookiecache,Pulsar对tailingread非常友好,基本不使用IO。正常情况下,我们的Consumer会马上拿到Producer发来的消息,所以这部分在持久化之后还是以Cache的形式存在于Broker中。当然,即使Broker没有Cache(比如Broker是新建的),我们的Bookie也会在Memtable中有自己的Cache,通过多个Cache减少读取过程中的IO。我们可以发现,在最好的情况下,读写的IO是完全隔离的,所以在Pulsar中支持百万主题很容易,但是在我们的Kafka和RocketMQ中就很难了。无限流存储一个topic其实就是一个账本流(Segment)。通过这样的设计,Pulsar不是一个纯粹的消息队列系统,而是可以替代流式系统,所以又被称为流式原生平台,可以替代Flink等系统。可以看到我们的EventStream(topic/partition)是由多个Segment存储组成的,每个Segment由Entry组成。这可以看作是我们发送的每一批消息通常都被视为一个Entry。段可以看作是我们写文件的一个基本维度。同一个segment的数据会写在同一个文件中,不同的segment会是不同的文件,segment之间的数据会保存在Metadata中。Kafka和RocketMQ消息的分层存储都会有一定的存储时间,因为磁盘会有空间限制。Pulsar中也提供了这个功能,但是如果你想永久存储你的消息,你可以使用分层存储。我们可以定期刷新一些较旧的数据到便宜的存储,比如s3,然后我们可以为我们的消息队列无限存储。Pulsar中的数据复制与Kafka和RocketMQ有很大不同。在其他消息队列中,其他副本通常是主动同步的,通常这个时间变得不可预测。Pulsar中采用类似Qurom的协议,给出一组可用的Bookie池,然后并发写入一部分,只要返回部分成功(通常大于1/2)即可。EnsembleSize(E):确定可用于给定账本的Bookie池的大小。WriteQuorumSize(Qw):指定Pulsar写入Entry的Bookie的数量。AckQuorumSize(Qa):指定必须被Ack写入的Bookie的数量。使用这种并发写入的方式会让数据复制更加高效,尤其是在数据副本很多的时候。Consumer接下来说说Pulsar中最后一个重要的组件Consumer。订阅模式订阅模式用于定义我们的消息如何分发给不同的消费者。不同的消息队列中间件有自己的订阅模式。一般我们常见的订阅模式有:集群模式:一条消息只能被一个集群内的消费者消费。广播模式:一条消息可以被集群中的所有消费者消费。Pulsar中提供了四种订阅模式,分别是:Exclusive:顾名思义,只有一个消费者可以独占。如果同一个集群中有第二个消费者注册,第二个就会失败。这适用于全球订阅。连续新闻。容灾:增强版独享,独享一个挂了会自动切换到另一个好的消费者,但独占只能有一个。共享模式:这种模式看起来有点像集群模式,一条消息只能被集群中的一个消费者消费,但是与RocketMQ不同的是,RocketMQ是基于Partition维度的,同一个Partition的数据会被发送到机器。Pulsar中的消费不走Partition维度,而是训练所有消费者轮流发送消息。这样做有什么好处?如果你有100台机器,但是你只有10个Partition,实际上你只有10个消费者可以运行,但是在Pulsar中,这100台机器都可以用来进行消费处理。Key共享:类似于上面提到的Partition维度发送,在RocketMQ中所有具有相同Key的顺序消息都会发送到一个Partition中。但是这里不会有Partition维度,只会根据Key的Hash分配给一个固定的Consumer,也解决了consumer的能力受限于Partition数量的问题。消息获取方式无论是在Kafka还是RocketMQ中,我们都是使用Client来定时训练我们的Broker来获取消息。这种模式称为长轮询模式。这种方式的缺点是网络开销比较大。我们来计算消费者的消费延迟。我们假设Broker和Consumer之间的网络延迟为R。那么我们的总时间就是:当某条消息A刚到达Broker时,此时long-polling只是打包数据返回,而Broker返回给Consumer的时间为R。Consumer再次发送Request请求,这又是R。这里又把我们的消息A返回给ConsumerR,如果只考虑网络延迟,可以看出我们消息的消费延迟大约是3R,所以我们必须想办法优化一下。有的同学可能第一时间想到,我们的消息会直接推送给我们的消费者。现在我们的延迟只会是一次R。这就是我们常见的push模式。但是简单的推送模式是有问题的。如果我们的生产速度远大于消费速度,那么推送的消息肯定会炸毁我们的内存。这是背压。那么我们如何解决背压呢?我们可以优化push方式,变成动态推送。我们结合Long-polling,在长轮询请求时,通知BrokerBuffer剩余空间,由Broker负责推送数据。这时Broker知道最多可以推送多少条数据,所以它可以控制推送行为,以免压垮Consumer。例如:当Consumer发起请求时,Buffer的剩余容量为100,Broker每次最多可以返回32条消息。然后Consumer的长轮询请求Broker会在执行3次Pushes(共96条消息)后返回一个Response给Consumer(Response包含4条消息)。如果采用长轮询模型,Consumer每发送一次请求,Broker就执行一次响应。本例需要4次长轮询交互(共4次Requests和4次Response,共8次网络操作;在DynamicPush/Pull中,1次Request,3次Pushes和1Response,共5次网络操作)。因此Pulsar采用这种消息获取方式进一步优化了Consumer层的消息到达时间。我觉得这个设计很巧妙,很多中间件的long-polling模式都可以参考这个思路进行改进。总结ApachePulsar的很多设计思想都不同于其他中间件,但无疑更接近未来。大胆预测未来其他一些消息中间件的发展也会向它靠拢。目前,Pulsar在中国的用户越来越多。腾讯云提供了TDMQ,即Pulsar的云版本。当然,还有华为、知乎、虎牙等知名企业也在逐步尝试。我相信Pulsar确实是一种趋势。最后也让我想起了大江大河大结局中的一句话:所有的改变都可能伴随着痛苦和弯路。任何危险的浅滩和礁石都可以被封锁。道在何处,纵有万人,我也必去。作者:CafeLatte编辑:陶佳龙来源:转载自公众号CafeLatte(ID:close_3092860495)
