前言为了实时了解线上业务数据,京东算法智能应用部打造了基于ClickHouse的实时计算分析引擎,为业务团队提供实时数据支持,挖掘潜力通过预警功能解决问题。本文结合引擎开发过程中资源位数据聚合计算的业务场景,总结出实时数据聚合计算实现秒级查询的技术方案。ClickHouse是整个引擎的基础,下面先介绍一下ClickHouse的相关特性和适合的业务场景,以及最基础的表引擎MergeTree。接下来详细介绍技术方案,包括Kafka数据消费到数据写入,建表结合ClickHouse特性,完善的数据监控,秒级响应从亿级数据偶发查询超时到百亿级数据的优化过程。ClickHouseClickHouse是Yandex内部业务驱动产生的列式存储数据库。为了更好的帮助自己和用户分析网络流量,开发了ClickHouse进行在线流量分析,一步一步最终形成了现在的ClickHouse。当存储数据达到20万亿行时,90%的查询可以在1秒内返回结果。ClickHouse可以实现实时聚合。所有查询都是动态和实时的。从用户发起查询开始,整个过程需要在一秒内完成并返回结果。ClickHouse的实时聚合能力与我们面对的业务场景非常吻合。ClickHouse支持完整的DBMS。支持数据库、表、视图的动态创建、修改或删除,以及数据的动态查询、插入、修改或删除。ClickHouse采用列式存储,数据按列组织,属于同一列的数据会存储在一起,作为后续秒级查询的基础。列式存储可以减少数据扫描范围,数据按列组织,数据库可以直接获取查询字段的数据。另一方面,逐行扫描扫描获取每行数据的所有字段,然后从每行数据中返回需要的字段。虽然只需要部分字段,但扫描所有字段,列式存储避免了冗余数据扫描。另外,列式存储压缩率高,数据在网络中传输速度更快,对网络带宽和磁盘IO的压力更小。除了完备的DBMS和列式存储,还支持在线实时查询,拥有完备的SQL支持和功能,拥有多种表引擎,满足各种业务场景。正是因为ClickHouse的这些特点,使得它可以在适合的场景下实现动态实时的秒级查询。合适的场景读多于写。数据一次写入多次查询,从多个角度挖掘数据,发现数据的价值。对于大而宽的表,读取大量行并聚合少量列。选择少量的维度列和索引列,对大宽表中的数据进行聚合计算,得到少量的结果集。数据是批量写入的,不需要经常更新或删除。数据写入后,相关业务不需要频繁更新或删除数据,主要用于查询和分析数据的价值。ClickHouse适用于商业智能领域,广泛应用于广告流量、App流量、物联网等诸多领域。借助ClickHouse可以实时统计线上业务数据,比如资源位的点击情况,可以对每个资源位进行bi告警。MergeTreeMergeTree系列引擎是最基础的表引擎,提供了主键索引、数据分区等基础能力。理解这部分是后续开发优化的基础和方向。Partition指定表数据分区方式,支持多列,但单列分区查询效果最佳。写入数据时,属于同一个分区的数据最终会合并到同一个分区目录中,不同分区的数据永远不会合并在一起。结合业务场景设置合理的分区,可以减少查询时对数据文件的扫描范围。排序在数据段内,数据的排序方式。使用多个字段排序ORDERBY(T1,T2)时,先按T1排序,如果相同值相同再按T2排序。MergeTree存储结构一个数据表完整的物理结构是数据表、分区以及每个分区下具体的数据文件。分区下的具体数据文件包括一级索引、各列的压缩文件、各列字段的标签文件。了解它们的存储和查询原理,为以后建表和聚合计算的优化提供方向。一级索引文件存储稀疏索引。通过ORDERBY或PRIMARYKEY语句,可以使用少量的索引来记录大量数据的区间位置信息。内容生成规则与排序字段相关,索引数据常驻内存,检索速度快。借助稀疏索引,可以排除主键范围之外的数据文件,从而有效缩小数据扫描范围,加快查询速度;每列压缩数据文件,存储每一列的数据,每一列字段都有一个独立的数据文件;每列字段标记文件,每列都有对应的标签文件,保存列压缩文件中数据的偏移信息,与稀疏索引对齐,与压缩文件对应,建立稀疏索引之间的映射关系和数据文件。它不能常驻内存,采用LRU缓存策略来加速其访问。读取数据时,只有通过标记数据的位置信息才能找到需要的数据,分为读取压缩数据块和读取数据块两个步骤。掌握数据存储和查询的过程,对后续的建表和查询有理论支撑。1)数据写入每一批数据写入都会生成一个新的分区目录,然后异步合并同一个分区的目录。根据索引粒度分别生成一级索引文件、每个字段的标签和压缩数据文件。写入过程如下图所示:【数据写入】2)查询过程查询过程通过指定WHERE条件不断缩小数据范围。借助分区可以找到数据所在的数据块,一级索引可以找到具体的行号区间信息,从标签文件中可以得到数据压缩文件中的压缩文件信息。查询流程如下图所示:【数据查询】如果查询语句与任务索引不匹配,则扫描所有分区目录。这个操作会给整个集群带来很大的压力。参考官方文档中的例子来说明查询过程。以(CounterID,Date)为主键,排序后的索引图标如下:[IndexSchema]指定查询如下:CounterIDin('a','h'),服务器会读取标签[0,3)和[6,8)区间数据中的数字。CounterIDIN('a','h')ANDDate=3,服务器会读取标签号在[1,3)和[7,8)范围内的数据。Date=3,服务器会读取标签号在[1,10]区间的数据。ClickHouse支持集群部署。查询分布式表时,集群会合并各个节点的数据,获取所有节点的数据后返回结果。MergeTree系列表引擎支持副本。例如,ReplicatedMergeTree表引擎构建表来存储详细数据。接下来介绍的两个表引擎都是继承自MergeTree,但各有特色。ReplacingMergeTree实现了数据去重。建表时,设置ORDERBY排序字段作为判断重复数据的唯一键。合并分区时,会触发重复数据的删除,一定程度上可以解决数据重复的问题。AggregatingMergeTree在合并分区时根据定义的条件聚合数据,预先计算需要聚合的数据,在聚合查询时直接使用结果数据,以空间换时间来提高查询性能。引擎需要为所有列使用AggregateFunction类型。了解了ClickHouse的内容后,接下来介绍完整的技术方案。技术方案和查询优化资源的数据来源包括Kafka的实时数据和存储在hdfs中的离线数据。实时数据通过Flink实时任务写入ClickHouse,离线数据通过创建MapReduce定时任务写入ClickHouse。架构图实时数据存储实时数据从实时数据写入CK流程:各业务线产生的实时数据写入kafka通道,根据数据量分配不同分区号。创建的flink任务会消费各个业务的kafka数据,各个业务的处理过程会有所不同。一般包括过滤算子、数据处理算子和写算子。过滤运算符过滤掉不需要的数据。这一步非常重要。设置严格的数据评估标准,防止脏数据和不符合规则的数据被写入集群。另外,对于脏数据的过滤也要做好文档记录,在数据完整性测试的过程中会用到。数据处理算子主要负责从实时数据流中解析出业务所需的数据。这个过程还需要设置严格的校验逻辑,保证数据是干净的;如果涉及更新数据处理逻辑,则必须保证及时更新处理逻辑。写入算子采用批量写入方式。根据集群情况,设置合理的batch,达到均衡的实时查询和写入性能。写入ck的进程可以通过域名连接分布式表,也可以通过nginx进程掌握集群机器IP列表。每个nginx进程自己轮询,平衡对集群中每台机器的写入,但是要保证写入ClickHouse的QPS不能太小,防止写入不平衡。离线数据入库为离线数据创建定时任务,对hive表中的数据进行处理,通过创建MapReduce定时任务将处理后的数据写入ClickHouse。离线数据入库流程还包括过滤、数据处理、写入ClickHouse。批量写入在上一章merge中介绍过。每次写入数据都会生成一个临时分区目录,然后异步合并同一个分区的目录。写入过程会消耗集群的资源,所以必须使用批量写入的方式,每批写入的item数量视集群和数据情况而定(10000、50000、100000批可以作为参考).使用JDBC实现批量写入程序如下:JDBC驱动,可以使用官方驱动:
