当前位置: 首页 > 科技观察

饿了么轻量级分布式时序数据库的设计与探索!

时间:2023-03-19 22:35:58 科技观察

笔者介绍黄杰,2015年加入饿了么,现任框架工具部高级开发经理,主要负责饿了么的监控系统及监控系统周边工具。1.背景饿了么对时序数据库的需求主要来自于各种监控系统,主要用于存储监控指标。最初使用的是Graphite,后来逐渐对指标有了多维度的需求,主要体现在一个指标中加入多个标签组成一个Series,然后对标签进行过滤和分组计算。这时,石墨基本难以满足需要。业界广泛使用的TSDB有以下几类:InfluxDB:InfluxDB被很多公司使用,包括饿了么的一些监控系统。它的优点是支持多维多字段,存储方面也根据TSDB的特点进行了优化,但开源部分不支持。很多公司自??己做聚类,但是大部分都是根据指??标名称来做的,所以单指会有热点问题。现在饿了么也在做同样的事情,但是热点问题很严重。大指标已经用上了最新的服务器,查询性能还是不理想。如果按照SeriesSharding做,成本还是有点高;Graphite:根据索引编写和查询,计算功能较多,但难以支持多维,包括机房或多集群查询。原来饿了么将业务层的监控指标存储在Graphite中,效果很好。但是,有些需求在活动之后基本很难满足。由于其存储结构的特点,占用了大量的IO。根据目前网上的数据写放大差不多是几十倍;OpenTSDB:基于HBase,优点是存储层不需要自己考虑,可以很好的做查询聚合,还有HBase的热点问题。之前公司也使用过基于HBase的TSDB来解决OpenTSDB的一些问题,比如热点、部分查询聚合和委托给HBase等,目的是优化其查询性能,但仍然严重依赖HBase/硬盘文件系统;HiTSDB:阿里提供的TSDB,存储也是使用HBase,在数据结构和Index上做了很多优化。没有具体的研究。感兴趣的同学可以在阿里云上试用;Druid:Druid其实是一个OLAP系统,但是也可以用来存储时序数据,但是看到它的架构图就放弃了;ElasticSearch(ES):也有公司没有实际测试直接用ES存储,但总感觉ES不是真正的TSDB;atlas:Netflix出品,全内存TSDB,最近几个小时的数据都在内存中,历史数据需要外存,所以没有详细研究;beringei:Facebook出品,也是全内存TSDB。最近的数据也在内存里,应该还在潜伏期。最后,我们决定自己实现一个分布式时序数据库。具体需要解决以下几个问题:轻量级,目前只依赖Zookeeper;用于监控系统,查询性能更好;由于饿了么目前是多机房,监控系统也是多机房,所以必须支持单机房写入,多机房聚合查询等;必须要有自动Rollup的功能,比如用户可以写成10s的精度,系统自动roll到分钟、小时、天的级别,支持大时间范围内的查询,比如报表等;支持类SQL查询方式;支持多副本,提高整个系统的可靠性,只要有一个副本存活,就可以正常提供服务,指定副本数量。2、整体设计采用计算存储分离架构,分为计算层LinProxy和存储层LinStorage。注:LinProxy主要是做一些SQL解析和一些中间组合的重新聚合计算。如果不是跨集群,LinProxy就不用了。对单个集群的每个节点,嵌入一个LinProxy,提供查询服务;LinDBClient主要用于数据写入,也有一些查询API;LinStorage的每个节点组成一个集群,节点之间进行复制,有一个副本的leader节点提供读写服务。本次设计主要参考了Kafka的设计,可以将LinDB理解为类Kafka数据写复制+底层时序存储层;LinMaster主要负责database、shard、replica的分配,因此LinStorage存储的调度和MetaData(目前存储在Zookeeper中)的管理;因为LinStorageNode是Equivalent的,所以我们基于Zookeeper选择集群中的一个节点成为Master。每个Node以心跳的形式向Master报告自己的状态。Master根据这些状态进行调度。如果Master挂了,它会自动选择另一个Master。出来了,这个过程对整个服务来说基本是无损的,所以用户基本是没有察觉的。1.写入整个写入过程分为以下两部分:WAL复制,这部分是参考Kafka设计的,只要用户的写入成功写入WAL就认为成功(因为主要用于监控系统,所以数据的一致性不做太多保证),这样才能提供系统的写吞吐量;localwrite,这个过程就是把WAL数据解析写入自己的存储结构中,只有写入本地存储的数据才能查出来。整个过程不像某些系统那样完成每一个写入过程。我们把这个过程分成两步,让它异步。WAL复制目前LinDB的副本复制协议采用的是多通道复制协议,主要是基于WAL在多个节点之间的复制。每个节点上WAL的写入都是通过独立的写入操作完成的,所以client写入的WAL对应于leader。如果成功,则认为写入操作成功。Leader所在的节点负责将对应的WAL复制给对应的Follower。同样,如果WAL写入成功,则认为副本成功,如下图:多通道复制协议成功写入Leader副本,提高了写入速率,但也带来了以下问题:数据的问题一致性;数据丢失的问题。上图中Server1为Leader,使用3个Replications复制1-WAL为例:当前Server1为分片的Leader接受Client的写入,Server2和Server3均为Follower接受来自Server1的复制请求。此时,1-WAL通道作为当前的数据写入通道,Server2和Server3可能会滞后于Server1。注意:整个过程中需要注意以下索引:客户端写入时AppendIndex,表示当前客户端写入到哪里;每个follower对应,都会有一个ReplicaIndex,表示对应的follower在哪里消费Leader,同步到哪里;FollowerIndex的Ack,表示Follower已经成功复制到本地WAL;Follower的复制请求其实相当于一个特殊Client的写入,所以也有相应的AppendIndex。只有已确认的索引才被标记为已处理。对于Leader,小于最小AckIndex的WAL数据可以被删除。在这个过程中,如果Server2或者Server3其中一个出现问题,此时对应的ConsumeIndex是不会动的,只有对应的服务恢复后才会继续处理。整个过程中可能会出现以下情况:LeaderReplicaIndex>FollowerAppendIndex,此时需要根据FollowerAppendIndex重新设置LeaderReplicaIndex。可能有两种情况,具体情况在ReplicationSequence中有描述;LeaderReplicaIndex<>如果此时Server1挂了,从Server2和Server3中选出新的Leader,此时选择Server2作为Leader:Server2会开启2-WAL复制通道,复制到Server1和Server3。由于Server1目前宕机,所以暂时只复制到Server3。此时数据写入通道为2-WAL;Server1开始恢复后,Server2会打开2-WAL复制通道给Server1,Server1会将剩余的1-WAL发送给Server2,Server3复制的数据复制给他们。对于异常情况,WAL中的数据不可能正常。因为ACK后的删除导致WAL占用磁盘过多,所以需要对WAL有一个SIZE和TTL的清理过程。一旦WAL因为SIZE和TTL被清理,几个Index就会被混淆,具体病症如上所述。多通道复制协议带来的问题:每个通道都有对应的Index序列,保存每个通道的LastIndex。对于单通道拷贝,只需要保存一个LastIndex。价格其实很不错。本地写后台实现了shard级别的写隔离,即每个shard都会有一个独立的线程负责写,不会因为某个数据库或某个数据库的写量激增而导致对其他数据库的写shard,但由于单机托管的分片过多,线程数可能会过大。如果遇到这种情况,应该通过扩容机器来解决,或者在新建数据库时合理分配分片数量。由于是单线程写操作,很多时候不需要考虑多线程写导致的锁竞争问题。数据存储结构说明,以单节点单数据库的数据结构为例:一个数据库在单节点上会有多个分片,所有分片共享一个索引数据;所有的数据都是按照数据库的间隔和时间片来计算的,存储具体的数据,包括数据文件和索引文件。这样设计主要是为了方便处理TTL。如果数据过期,直接删除相应目录即可。每个shard下面都会有一个segment,segment按照interval存储对应时间片的数据。那么为什么每个segment下会间隔存储很多数据族呢?这主要是因为LinDB主要解决海量监控数据的存储问题。一般监控数据基本都是第一次写入,历史数据不会写入。但是整个LinDB的数据存储方式和LSM方式类似。因此,为了减少数据文件之间的间隙,它们之间的merge操作导致写放大,最终被测量,然后被分割成segment时间片。下面以间隔10s为例:段按天存储;每个segment按小时划分为数据族,每小时一个族,每个族中的文件按列存储具体的数据。写流程描述:系统会为每个分片启动一个写线程,该线程负责本分片的所有写操作。首先将measurement、tags、fields对应的数据写入数据库的索引文件,生成对应的MeasurementID、TimeSeriesID、FieldID,主要完成string->int的转换。这样做的好处是所有的数据存储都是按数据类型存储的,可以减少整体的存储大小。因为对于每个数据点来说,Measurement、Tags、Field等元数据都被占用了,比如cpu{host=1.1.1.1}load=11514214168614。其实转换成ID后,cpu=>1(measurementid),主机=1.1。1.1=>1(timeseriesid),load=>1(fieldid),所以最终数据存储为111514214168614=>1,这个考虑了OpenTSDB的设计。如果索引写入失败,则认为写入失败。有两种类型的失败。一是数据写入格式有问题,这种类型的失败直接标记为失败;另一种是由于内部问题,此时写入失败,需要重试。利用索引得到的ID,结合写入时间和数据库Interval计算,得到需要写入哪个segment下的family,写入family的过程,直接写入内存,满足高吞吐需求,并且内存数据达到内存限制后,触发Flush操作。整个写入过程先写入内存,然后Flusher线程将内存中的数据dump到对应的文件中,这样一批数据是顺序写入的,根据FieldTyperollup最新的数据,从而进一步减少磁盘IO操作。2、查询引擎LinDB查询需要解决以下问题:解决多个机房之间的查询;高效的流式查询计算。说明:引入LinProxy,支持多房间或多集群查询。LinProxy主要负责面向用户的查询请求;SQLPlan负责解析具体的SQL,生成最终的执行计划和需要计算的中间结果的函数;通过Zookeeper中的元数据将请求路由到特定LinDB集群中的相应服务;每个LinConnect负责与一个LinDB集群通信,每个LinConnect内部保存一份对应集群的Metadata,Metadata信息在每个Metadata发生变化时,服务器会推送给LinConnect,这样LinConnect基本上可以近乎实时地更新元数据;AggregatorStream主要负责对各个LinConnect的中间结果进行最终的合并计算操作;整个LinProxy的处理过程是异步的,这样在IO等待的时候使用线程来做计算。每个节点接收来自LinConnect的请求,计算内部查询并将其作为中间结果返回给LinConnect。详细过程将在后面介绍。节点查询说明:如图所示,Client的一个查询请求会产生很多小的查询任务。每个任务都有单一的职责,只做自己的任务,然后将结果交给下一个任务。因此,所有的查询计算任务都需要异常非阻塞的处理,IO/CPU任务分离;整个服务端查询使用Actor模式,简化了整个Pipeline的处理;任何任务完成,如果没有结果产生,则不会产生下游任务,所有下游任务根据上游任务是否有结果来判断;最后通过ReduceAggregate将底层结果聚合成最终结果。3.存储结构倒排索引倒排索引分为两部分。目前,与索引相关的数据仍然存储在RocksDB中。根据TimeSeries的Measurement+Tags,生成对应的唯一ID(类似于Luence中的docID);根据Tags倒排索引,指向一个ID列表。TSID列表以BitMap的形式存储,以便在查询时通过BitMap操作过滤出需要的数据。BitMap使用RoaringBitMap;每种类型的数据都存储在一个独立的RocksDBFamily中。内存结构为了提高写入性能,将当前周期的数据写入内存,当内存达到一定的限度或时间后,将内存中的数据转储到文件中。内存存储分为当前可写和不可写。目前writable用于接收正常的数据写入,non-writable用于dump到文件。如果转储成功,不可写部分将被清除。如果可写的MemoryTable也达到了内存容量的限制,但是不可写的部分还没有被转储,那么写入就会被阻塞,直到有可用的内存可以写入数据。更多的内存会导致OOM。MemoryTable内部使用一个Map来存储MeasurementID→MeasurementStore的关系,即每个Measurement都存储在一个独立的Store中。在MeasurementStore中存储Measurement下每个TSID对应的数据。每个TSID对应的数据存储在一个MemoryBlock中。每个MemoryBlock按照TSID的顺序存储在ArrayList中,TSID存储在一个BitMap中。通过TSID在Bitmap中的位置来定位MemoryBlock在ArrayList中的具体位置。这里解释一下为什么不直接用Map来存储,因为整个系统都是Java实现的。Java中的Map结构不适合存储小对象的数据,在内存中存在多次存储。由于每个TSID对应一条时间线,因此每条时间线中可能有多个数据点。比如count只有一个计数值,timer有count、sum、min、max等多个值。每种数据类型都存储在Chunk中。Chunk存放在内存的两部分,堆内和堆外。最近一段时间的数据放在堆中,历史数据压缩后放在堆外。最近的数据尽量放在内存中,因为LinDB的目的主要是存放一些监控数据,而监控数据主要关注的是最近一段时间的数据。文件存储结构文件存储类似于内存存储。同一测量的数据以块的形式存储在一起。查询时,通过measurementID定位测量数据存放在哪个block中。在MeasurementBlock之后存放一个OffsetBlock,即每个MeasurementBlock存放的Offset,每个Offset存储4个字节。OffsetBlock存储一个MeasurementIndexBlock,依次存储每个MeasurementID,以Bitmap的形式存储。FooterBlock存放在文件末尾,主要存放Version(2bytes)+MeasurementIndexOffset(4bytes)+MeasurementIndexLength(4bytes)。data数据块都是数值,所以使用xor压缩,参考facebook的大猩猩论文;MeasurementBlock:每个MeasurementBlock的存储方式与Measurement类似,只是Measurement中的MeasurementID换成了TSID。TSEntry存储TSID每一列对应的数据,一列数据对应存储一段时间的数据点。查询逻辑:第一次加载DataFile时,会将MeasurementIndex放入内存,查询并输入MeasurementID传递MeasurementIndex中的位置,再传递这个位置N来查询具体的MeasurementBlock在OffsetBlockOffset中,由于每个Offset是4个字节,所以偏移位置=(N-1)*4,然后读取4个字节得到真正的Offset。同理,可以通过TSID找到具体的TSEntry,然后根据条件过滤具体的列数据,最终得到需要读取的数据。三、发??展历程LinDB自2年前正式服务于公司监控系统以来,从1.0发展到2.0,稳定运行了2年多。除了一个RocksDB问题外,几乎没有任何问题。到现在,3.0的性能有了很大的提升,我们基本上是站在业界一些成熟方案的基础上,逐步演进的。也有人问为什么LinDB这么快。其实我们参考了很多TSDB的做法,然后选择最好的设计,然后根据时序的特点做一些优化。时序一般是乱写,但也是乱写的一种。我们先把内存中的随机写改成顺序写,最后顺序写文件。所有数据都是有序的,所以query顺序读取也很关键;将写入的measurement/tags/fields转换成Int,生成倒排索引,最后生成TSID(类似Luence的docID),这样大大减少了最终的数据,毕竟指标等字符串占据了绝对多数。这与OpenTSDB非常相似。虽然InfluxDB已经有一段时间将数据存储在块中,但它仍然将这些数据放在块的头部。这些都是成本,尤其是在compact中,不像其他TSDB直接存储时间戳,一般时间戳在毫秒级占用8个节点。虽然基于时间顺序的优势,使用delta-encoded压缩很好,但是我们要实现***,我们用一位来表示时间。具体方法是按照上面的描述存储时间的高位和Interval,把时间的高位放到目录中,然后结合高位计算一个delta,将delta以1bit格式存储,到表示是否有数据,因为大部分监控数据都是连续的数据,所以这样做是合理的,所以时间数据的存储也大大减少了空间;我们发现,对于一个指标Data的多个Field,每个Field的数据相邻点基本都非常相似。LinDB2.0直接用RocksDB存储,将多个Field存储在一起,然后对相邻的点进行压缩,这样压缩率不会很高。高,而且每次fetch查询都要把所有的数据都读出来,这也是我们考虑在LinDB3.0实现列式存储的原因。我们将同一列存储在一起以提高压缩率,查询时只读取需要的数据。我们没有使用gzip、snappy或zlib进行整个压缩,因为它们不适合数字类型。我们直接参考了facebook的gorillapaper的xor方法,已经被很多TSDB采用;基于以上基本顺序读取不再是问题,基于TSID查询也不成问题,因为整个设计都是基于TSID→数据,所以需要解决一个基于一组数据的随机读取基于反转的TSID。如上,我们把TSID放在Bitmap中,然后通过Bitmap计算Offset,直接找到数据。通过存储的优化,可以准确搜索TSID查询,而不是二分查找。还有一点就是LinDB在新建数据库的时候指定了Interval之后,系统可以自己rollup,不像InfluxDB要写很多ContinueQuery,这些在LinDB都是自动化的;查询计算并行流处理。所以用一句话来概括——一个高效的索引加上一堆值,然后是如何玩转这些值。自我监控LinDB也自带一些监控功能:OverviewDashboard的未来展望,丰富的查询功能;优化内存使用;改进自我监控;如果可能,计划开源。对比测试下面是与InfluxDB和LinDB2.0的一些查询性能对比。由于InfluxDB的集群需要商业版,所以在没有Cache的单机默认配置下进行测试。服务器配置阿里云机器:8核16G内存大维度标签:host(40000),disk(4),partition(20),模拟服务器磁盘监控,Series总数320W,每个Series写一个数据点:小维度1天内聚合测试标签:host(400),disk(2),partition(10),模拟服务器磁盘监控,Series总数8K,每个Series写一天数据,每个维度每2s写1个点,一天每个维度共43200个点,所有维度共43200*8000个点,共345600000或3亿多条数据:小维度7天内聚合测试标签:host(400),disk(2),partition(10),模拟服务器磁盘监控,Series总数为8K,每个Series写入7天的数据,每个维度每5s写入1个点,每个维度一共17280一天点数,一共1728080007点全天全维度,即967680000,多t韩九亿点。我想说明一下这个测试,感谢LinDB的自动Rollup,如果InfluxDB开启了ContinueQuery,相信应该没问题。