当前位置: 首页 > 后端技术 > Java

RocketMQCompaction专题的设计与实现

时间:2023-04-02 00:50:53 Java

本文作者:刘涛,阿里云智能技术专家。01CompactionTopic简介一般来说,消息队列提供的数据过期机制有以下几种。比如有一种基于时间的过期机制——数据保存多久就会被清理,还有一种基于数据总量的过期机制——数据分区的数据量达到一个后一定的价值,它会被清理干净。CompactionTopic是一种基于key的数据过期机制,即同一个key的数据只保留最新的value。该特性的应用场景主要是维护状态信息,或者需要KV结构时,可以通过CompactionTopic直接将key-value信息保存到MQ中,从而去除对外部数据库的依赖。比如维护消费位置,可以以消费组加分区为key,以消费位置为offset,以消息的形式发送给MQ。压缩后,消费时可以获得最新的offset信息。此外,CompactionTopic中还可以存储源信息,如connect中的Binlog解析站点或其他源处理站点信息。同时CompactionTopic还支持存储RSQLDB和RStreams的checkpoint信息。02需要解决的问题在compaction过程中,需要解决以下问题:首先,在数据写入过程中,如何将数据从producer发送到broker,最后放到磁盘上,如何保证HA数据主备之间?二、整个compaction过程包括哪些步骤?数据量太大怎么优化?三、数据消费时如何索引消息?找不到消息指定的offset消息怎么办?四、如果机器出现故障,如何恢复旧数据?03方案设计与实现首先,如何写入数据。先写入CommitLog,主要是复用CommitLog本身的HA能力。然后通过reput线程,将CommitLog消息按照topic加上partition的维度拆分到不同的文件中,消息按partition排序,同时生成一个索引。这样最终的消息是按照Topic加上partition的粒度来调节的。在compaction过程中,为什么不对原来的commitLog做regularization,而是按partition做extraregularization呢?原因如下:所有的数据都会写入CommitLog,所以单个Topic的数据是不连续的。如果要遍历单个topic的所有数据,可能需要跳过读,会导致大量的冷读,对磁盘IO影响比较大。CommitLog数据有自动过期机制,会删除旧数据,所以数据不能直接写入CommitLog,而CompactionLog中的旧数据是key过期的,可能不会被删除。Compact是以分区为维度进行的。如果同时压缩多个分区,效率很低。因为很多分区的key同时在同一个结构中,所以同一个分区能压缩的数据比较少,压缩后需要重写。因此,最好在压缩之前通过报表服务重新组织消息。压缩过程如下:第一步是确定需要压缩的数据文件列表。一般大于两个文件,需要排除当前正在写入的文件。第二步,遍历上一步过滤出来的文件,得到key到offset的映射关系。第三步,根据映射关系,将要保留的数据改写到新的文件中。第四步,用新文件替换旧文件,删除旧文件。第二步构建OffsetMap的主要目的是知道哪些文件需要保留,哪些文件需要删除,以及文件的上下文,从而确定写入的布局。确定布局后,可以附加所需的文件。保留的数据被写入新文件。这里记录的不是key到value的信息,而是key到Offset的信息。因为value的数据体可能会更长,占用空间更大,而offset是固定长度的,通过offset信息也可以明确消息的先后顺序。另外,key的长度不固定,直接将原始key存放在map中也不合适。因此,我们使用MD5作为新密钥,如果MD5相同,则认为密钥相同。在做compaction的时候,会遍历所有的message,删除key相同且offset小于OffsetMap的values。最后通过原始数据和map结构得到压缩数据文件。上图显示了目录结构。写入时,上部是数据文件,下部是索引,标红的两个文件是要压缩的。压缩文件存放在子目录下,旧文件需要先标记为删除,同时将子目录文件和CQ移动到旧根目录下。注意,这些文件与CQ文件名是一一对应的,可以一起删除。随着数据量越来越大,构建的OffsetMap也会越来越大,无法容纳。所以不能使用fullbuild方式,不能一次性构建所有待压缩文件的OffsetMap。需要把fullbuild改成incrementalbuild,build逻辑也会有一点小改动。第一轮构建:如上图,先构建上面部分的OffsetMap,然后遍历文件。如果偏移量小于OffsetMap中对应key的偏移量,则删除,如果相等,则保留。下面部分消息的偏移量必须大于OffsetMap中的偏移量,所以也需要保留。第二轮建设:从上一轮结束的地方开始建设。如果上一轮的key在新一轮中不存在,则保留上一轮的值;如果存在,仍然会按照小于删除大于保留的原则进行构建。将一轮构建改为两轮构建后,OffsetMap的体积明显减小,构建的数据量也明显减少。原来的索引是CommitLogPosition、MessageSize和TagHush,但是现在我们复用了bcq结构。由于Compact后数据不连续,所以无法通过之前的方式直接找到数据的物理位置。由于queueOffset还是单调递增的,所以可以通过二分查找的方式找到索引。二分查找需要queueoffset信息,索引结构也会发生变化,但是bcq有queueoffse信息,所以bcq的结构可以复用。compact前后queueoffset不变。如果queueoffset不存在,则获取第一个大于queueoffset的消息,然后从头开始向客户端发送所有全量数据。当由于机器故障导致消息丢失时,需要重建备机。因为CommitLog只能恢复最新的数据,而CompactionLog需要的是旧数据。之前的HA模式,在compact过程中可能会删除数据文件,所以master和backup之间的同步无法基于复制文件的方式来完成。因此,我们实现了基于消息的复制。即模拟消费者请求从master拉取消息。拉取点一般从0开始,大于等于commitLog的最小offset时结束。pull完成后,再做一次forcecompaction,对CommitLog数据和恢复后的数据进行compaction,保证保留的数据是压缩后的数据。后续流程不变。04使用说明producer端使用现有的producer接口,由于需要partition压缩,所以需要将同一个key路由到同一个MessageQueue,相关算法需要自己实现。消费端使用已有的消费接口,消费完消息后,存储到本地的Map结构中使用。我们的大部分场景都是从头拉取数据,所以需要一开始就把消费点重置为0。拉取后,将消息key和value传入本地kv结构体,使用时直接从该结构体中取。