为什么kafka的性能下降这么快?我用RocketMQ就不会这样了我,作为一个资深的程序员,你从来没有听说过这两种技术。之前在使用这两个消息队列的时候遇到了一个很奇怪的问题,就是在Kafka中获取了很多topic,性能下降的很快。不知道大家有没有遇到过,但是同样的场景切换在消息队列rocketmq中,下降速度没有那么快。不熟悉这两个消息队列结构的朋友听到这里一定不是很清楚。今天小编就来分析一下其中的原因,为大家解开疑惑。rocketmqNameServer的结构:主要是管理元数据,包括Topic和路由信息的管理。底层由netty实现。是一个无状态节点,提供路由管理、路由注册和发现,类似于ZooKeeperBroker:消息中转站,负责消息的发送和接收,持久化消息Producer:消息的生产者,一般是业务系统为消费者生成消息toconsumeConsumer:消息的消费者,一般是业务系统异步消费消息,RocketMQ中的每条消息都有一个Topic,用来区分不同的消息。一个主题通常有多个消息订阅者。当生产者向某个主题发布消息时,订阅该主题的消费者可以收到生产者编写的新消息。Topic中有多个Queue,其实就是我们发送/读取消息通道的最小单位。我们发送消息的时候需要指定写入某个Queue,拉取消息的时候也需要指定某个Queue。队列,因此我们的顺序消息可以根据我们的队列维度保持有序。如果要实现全局有序,需要将Queuesize设置为1,这样Queue中的所有数据都会有序。我们同一批消费者会根据一些策略来选择队列,比如平均分布或者一致性哈希分布。需要注意的是,当消费者下线或上线时,这里需要进行再平衡,即Rebalance。RocketMQ的rebalancing机制是这样的:定时拉取broker和topic的最新信息,每20srebalance一次,随机选择当前Topic的主Broker之一。这里需要注意的是,每次rebalancing时,是否会选择所有的主要Broker,因为会有一个Broker,然后会有多个Broker。获取当前Broker和当前ConsumerGroup的所有机器ID。然后进行策略分配。由于rebalancing是定时进行的,可能会有一个Queue同时被两个Consumer消费,所以消息会重复投递。读写队列数量不一致。在RocketMQ中,Queue分为读和写两种。刚开始接触RocketMQ的时候,一直以为读写队列数不一致就没什么问题。比如当consumer机器很多的时候,我们配置了很多readqueue,但是在实际过程中,发现消费不到消息或者根本消费不到消息。当写队列数大于读队列数时,大于读队列ID的写队列的数据不能被消费,因为不会分配给消费者。当读队列数大于写队列数时,不会有消息投递到如此多的队列。rocketmq中的存储机制RocketMQ以其强大的存储能力和强大的消息索引能力,以及各种消息类型和消息的特点脱颖而出。因此,学习RocketMQ的存储原理对于我们这些怀揣梦想的程序员来说变得尤为重要,而要说这个存储原理,就不得不说说commitLog文件,RocketMQ的消息存储文件。consumer依靠Consumerqueue文件的巧妙设计实现高性能、无乱序的消费,以及RocketMQ的强大支持。消息索引的特性依赖于indexfile索引文件。本文从三个神秘的文件开始:commitLog、Consumerqueue、indexfile。把这三个文件都看懂了,RocketMQ的核心就被你掏空了。一张图是写commitLog文件时commitLog、Consumerqueue、indexfile的关系文件夹下,文件夹下会有一个commitLog文件,但不代表文件就叫这个。文件名根据消息的偏移量确定。该文件有自己的生成规则。每个commitlog文件的大小为1G。一般第一个CommitLog的起始偏移量为0,第二个CommitLog的起始偏移量为1073741824(1G=1073741824byte)。commitLog文件最大的特点就是消息的顺序写入和随机读写。所有主题消息都存储在commitLog文件中。顺序写入可以充分利用磁盘顺序来减少数据存储的IO争用性能。Kafka也是通过硬盘顺序保存的。大家常说硬盘的速度比内存慢。其实这句话也是有歧义的。硬盘顺序读写时,速度不比内存慢,甚至比内存还快。这种存储方式就像一个数组。如果我们知道数组的下标,我们可以直接通过下标计算位置,找到内存地址。众所周知,数组的读取是很快的,但是数组的缺点就是插入数据比较慢,因为如果在中间插入数据需要将后面的数据向后移动。对于数组来说,如果我们只是顺序相加的话,数组的速度也是非常快的,因为数组没有后续的数据移动,这个操作是比较耗时的。回到RocketMQ中的commitLog文件,也是如此。顺序写入文件时,不需要过多考虑写入位置。您可以找到该文件并稍后放置。在取数据的时候,也和数组一样,我们可以通过文件的大小来准确的定位到是哪个文件,进而准确的定位到文件所在的位置。RocketMQ中的consumerqueue文件分为多个topic。消息所属的主题属于消息类型。每个主题有多个队列,每个队列包含不同的消息。同一个消费者组下的消费者可以同时消费同一条消息。一个主题下不同队列的消息。不同消费者下的消费者可以同时消费同一主题下同一队列的消息。同一消费组下的消费者不能同时消费不同主题下的消息。每个topic下的queue队列会对应一个Consumerqueue文件,这个文件中存放的是对应commitLog文件中的index位置,而不是存放真正的消息。consumequeue存放在store文件中,里面的consumequeue文件是按照topic排列的,然后每个topic默认有4个queue,consumequeue文件存放在里面。ConsumeQueue中不需要存储消息的内容,而是存储消息在CommitLog中的偏移量。也就是说ConsumeQueue其实是CommitLog的一个索引文件。consumequeue是一个定长结构,每条记录的大小固定为20字节,单个consumequeue文件默认包含30万个条目,所以单个文件大小约为6M。显然,Consumer在消费一条消息时,需要读取两次:先读取ConsumeQueue得到偏移量,然后根据偏移量找到CommitLog对应的消息内容。IndexFileRocketMQ还支持通过MessageID或MessageKey查询消息。按ID查询时,由于ID是由broker+offset(其中msgId指的是服务器)生成的,所以很容易找到对应的commitLog文件读取消息。对于使用MessageKey查询消息,MessageStore会建立索引来提高读取速度。indexfile文件存放在store目录下的index文件中,里面存放了消息的hashcode和索引内容。该文件由一个文件头组成:40字节长。500w个哈希槽,每个4个字节。2000万个索引条目,每个条目20个字节。所以这里我们可以估算出每个indexfile的大小为:40+500w4+2000w20字节,约400M。每次放入一条新消息的索引时,都会先取MessageKey的HashCode,然后用Hashcode对总槽数取模,确定消息key的位置。默认插槽总数为500W。只要取了hash,就不可避免的会面临hash冲突的问题。Indexfile也使用链表结构来解决散列冲突。这个和HashMap是一样的,只是没有红黑树的转换。个人猜测是冲突次数不够。到一个很高的层次,就不需要设计这方面了,甚至强行增加indexfile的文件结构也变得困难。另外,最新的索引指针放在索引文件中的槽中,因为最近的新闻最有可能在一般查询中被最先查询到。每个槽中放置的指针值是索引在索引文件中的偏移量,即后续索引的位置,索引存储消息在commitlog文件中的偏移量。每个索引的大小为20个字节。因此,根据当前索引在这个文件中的哪个偏移量,就很容易定位到索引的位置。根据之前固定的大小,可以快速计算出真正的坐标,依此类推,形成一个链表结构。kafkaBroker的结构:消息中间件处理节点(server),一个节点就是一个broker,一个Kafka集群由一个或多个broker组成。Topic:Kafka对消息进行分类,每条发送到集群的消息都必须指定一个topic。分区:一个物理概念。每个主题包含一个或多个分区。一个分区对应一个文件夹。分区的数据和索引文件存放在该文件夹中。每个分区都是内部排序的。生产者:生产者负责向代理发布消息。Consumer:消费者,从broker读取消息。ConsumerGroup:每个消费者都属于一个特定的消费者组。可以为每个消费者指定组名。如果不指定,则属于默认组。一条消息可以发送给不同的消费者组,但是一个消费者组中只能有一个消费者可以消费这条消息。在kafka的存储机制中,我们的producer会决定发送到哪个Partition,如果没有Key值,就会轮询发送。如果有Key值,则对Key值进行Hash,然后取分区数的余数,保证相同的Key值会路由到同一个分区。(所有系统的分区具有相同数量的路径)。众所周知,topic在物理层面是按分区分组的。一个主题可以分为几个分区。主题和分区是如何存储的?其实partitions也可以细分为logSegments。一个partition在物理上是由多个logSegments组成的,那么这些segments到底是什么?LogSegment文件由“.index”文件和“.log”文件两部分组成,分别表示为Segment索引文件和数据文件。这两个文件的命令规则是:partitionglobal的第一个segment从0开始,后面的每个segment文件的名字都是前一个segment文件的最后一条消息的offset值。没有数字补0,如下:第一段00000000000000000000.index00000000000000000000.log第二段,文件名由第一段最后一条消息的偏移量组成0000000000000170410.index00000000000000170410的最后一条消息的偏移量。asegment由000000000000239430.index00000000000000239430.log组成“.index”索引文件存放大量元数据,“.log”数据文件存放大量消息,索引文件中的元数据指向对应的消息中数据文件物理偏移地址。kafka和rocketmq的比较RocketMQ和Kafka的存储核心设计有很大的不同,所以在写性能上也有很大的差异。这是2016年阿里中间件团队在RocketMQ和Kafka上做的不同主题下的性能测试。:从图中可以看出,当Kafka将主题数从64个增加到256个时,吞吐量下降了98.37%。当RocketMQ的topic数量从64个增加到256个时,吞吐量只下降了16%。为什么是这样?Kafka一个topic下的所有消息都以partition的形式分布存储在多个节点上。同时在kafka机器上,每个Partition其实对应一个日志目录,目录下有多个日志段。所以,如果topic很多,kafka是顺序写文件的,但实际上文件太多了,会造成磁盘IO的激烈竞争。那么为什么RocketMQ在多topic的情况下还能保持不错的吞吐量呢?我们先看看RocketMQ中比较关键的文件:RocketMQ中的消息体数据并没有像Kafka一样写入多个文件,而是写入一个文件,这样我们的写IO竞争很小,可以在很多Topic中仍然保持高吞吐量。可能有人会说,这里的ConsumeQueue是连续写的,ConsumeQueue是在Queue维度创建文件的,所以文件数量还是很大的。这里ConsumeQueue写入的数据量很小,每条消息只有20Bytes,30W条数据也就6M左右,所以其实对我们的影响比对Kafka的Topic的影响要小很多。对了,一个topic分为10000个分区,10000个topic。每个主题都是一个单独的分区,这对于Kafka的负载来说是一样的。
