1背景得物供应链业务复杂。我们既有中间有大量仓库操作的JIT现货模式,也有代销到仓库这个品牌业务,有一个非常复杂的反向环节。如此复杂的业务背后,需要我们密切关注人、货、场、车的效率和成本,以及每一笔订单的及时履约。为此,我们需要各种粒度和维度的数据来支持我们的精细化管理。1.1业务初期在业务初期,我们后台管理系统中的一些报表查询速度慢。查询代码可见,如下图所示:这种现象一般表现为:大表JOIN,rdbms不擅长数据聚合,查询响应慢,调优困难;多表关联、索引优化、子查询优化增加了复杂度。索引读取数据库磁盘空间膨胀过快;数据量大,多维分析困难,跨域抓取困难,自助拉取实时数据困难。一方面是因为在系统设计之初,我们主要关注业务流程的功能设计,交易业务流程数据建模,以及未来核心指标如何实现,尤其是实现业务快速增长情况下的关键实时指标。支持。mysql在这方面越来越捉襟见肘了。另一方面,原因是mysql等oltp数据库不能满足实时数据分析的需要。我们需要探索一套实时数据架构,将我们在合同履行、仓储、配送等各个领域的数据拉到一起,并有效打通。因此,我们开始了我们对实时数据架构的探索,下图展示了我们的一些思考。附:数据视角的架构设计也是系统架构设计的重要组成部分。2架构演进2.1原始阶段2.1.1通过Adb(AnalyticDBforMySQL)实时join通过阿里云DTS同步,直接将业务数据库的单表实时同步到adb,通过adb强大的join能力和全兼容使用mysql语法,可以执行任意sql,对于单表大数据量场景或者单表和一些简单的维表join场景,性能还是不错的,但是在复杂的业务中,复杂的sqlrt很难满足要求,即使rt满足要求,单个sql消耗的内存,cpu也不尽如人意,能支持的并发量很有限。2.1.2通过Otter完成大宽表的构建,基于Canal开源产品,获取并下发数据库的增量日志数据,下游消费增量数据直接生成大宽表,但是宽table仍然写入mysql数据库实现单表查询,单表查询速度明显提升,没有olap数据库常见的做法,通过宽表减少join带来的性能消耗。但是存在以下问题:otter虽然封装的很好,可以通过数据路由做一些简单的数据拼接,但是在调试和线上的复杂度上还是有很大的难度;逻辑、cdc和实时ETL的工作是同时做的,耦合度高。2.2实时架构1.02.2.1flink+kafka+ClickHouse经过上述研究尝试,未能解决根本问题。我们开始提出建立一个标准的实时数据仓库的想法。20年olap的选择不多。我们把target放在clickhouse上。为了保证sequentialappend每次写入都会生成part文件,在满足一定条件的情况下会在后台定时merge。非常弱的updatedelete不能保证原子性和实时性。*Clickhouse只适用于数据量大、业务模型简单、更新场景少的场景。存储和计算不分离,复杂查询影响clickhouse编写。由于clickhouse的这些特点,尤其是不支持upsert的时候,我们通常需要提前在flink中聚合大表和宽表的数据,而供应链数据的生命周期长,操作过程也长.例如:商品的生命周期较长,短的为一周,长的为一个月以上;仓库异常环节很多,从卖家发货到收货、分拣、质检、拍照、鉴定、防伪、审核、包装、发货、买家签收等十几个甚至更多的环节,一张以商品ID为主键的宽表需要连接几十张业务表;供应链系统早期的设计并没有为每个表(入库单、作业单、履约单)这样的关键字段设置冗余的唯一编号,无法直接简单地进行数据的join。在这样的架构下,我们的flink在成本、稳定性维护、调优方面都是非常困难的。附:clickhouse不支持标准的upsert模式。可以使用AggregatingMergeTree引擎字段类型,使用SimpleAggregateFunction(anyLast,Nullable(UInt64))合并规则获取最后一条非空数据,实现类似upsert的功能,但影响读取时合并的性能。2.3实时架构2.02.3.1flink+kafka+hologres因此,我们迫切希望有一个支持upsert能力的olap数据库。同时可以应对供应链编写场景和复杂查询场景。我们希望olap数据至少可以做到以下几点:具备upsert能力,能够在flink中有效拆分大任务;存储和计算分离,复杂的业务计算,不影响业务写入,同时可以平滑扩容和缩容;具有一定的join能力,带来一定的灵活性具有完善的分区机制,热点数据查询性能不受整体数据增长的影响;具有完善的数据备份机制。这样一个行列混合的olap数据库,支持upsert,支持存储和计算分离,挺符合我们的预期的。目前,这样一套架构支撑着供应链上每天几千人的报表检索需求,以及每天10亿数据量的导出,访问量在所有得物toB系统中名列前茅。2.3.2我们遇到的一些问题:segment_key如何设置,选择哪个业务字段作为segment_key?供应链中的数十个环节都有运行时间。没有segment_key如何保证性能一直困扰着我们。设置合理的segment_key,比如有序的时间字段,可以实现完整的顺序写入。每个段文件都有一个最小值和最大值。所有的时间字段都需要比较,看是否在最小值和最大值之间(这个动作的成本很低),不在范围内直接跳过。在没有segment_key的查询条件下,也可以大大减少需要过滤的文件数量。批流集成背景:在业务快速发展的过程中,实时任务的不断迭代已经成为常态。供应链业务复杂,环节多,流程往往长达一个月,导致statettl设置周期长。job的operator发生变化(sql修改),checkpoint不能自动恢复,不能满足savepoint恢复机制,比如加groupby,join。历史数据的再次消费依赖于上游kafka的存储时效性。Kafka在公司平台一般默认存储7天,无法满足一个月数据刷新需求场景。解决方案:在源端通过批流融合实现离线+实时的数据读取和补全。(1)key离线去重,每个key只保留一个key,减少消息发送量。(2)合并离线和实时数据,使用last_value得到一条具有相同主键的最新事件时间戳的数据。(3)使用unionall+groupby方法可以作为一个选项代替join。(4)实时数据取自当日数据,离线数据取自历史数据,防止数据漂移。实时数据需要提前一小时。Join算子乱序问题分析由于join算子对joinkey进行哈希处理,在不同分片处理数据,在开启两个并发后,由于header_id字段的值发生变化,第二个数据流向详table到达第二个不同的taskmanage,不同的线程无法保证输出顺序,所以有一定概率数据会乱序输出,导致预期结果不正确,现象就是数据丢失。解决方法是通过headerinner获取明细表join后的detail_id,这样(joinkey)的值就不会再通过detail_idjoin由null变为非null,顺序也不会乱了。insertintosinkSelectdetail.id,detail.header_id,header.idfromdetailleftjoin(Selectdetail.idASdetail_id,detail.header_id,header.idfromheaderinnerjoindetailondetail.header_id=header.id)headerNewondetail.id=headerNew.detail_id2.3.3Hologresorstarrocks这里也说说大家比较关心的hologres和starrocks。从开源开始Starrocks也和我们保持着密切的联系,进行了很多深入的交流。我们也大致列出了两者之间的区别、各自的一些优点和我们认为的一些缺点。3其他一些要做的事情3.1开发提高效率的工具——flink代码生成器参考MyBatisgenerator的一些思想,使用模板引擎技术,自定义模板生成flinksql。可以解决代码规范,提高开发效率。基本上可以通过代码配置生成flinksql。3.2增效工具开发——可视化平台直接通过配置在线编写sql,直接生成页面和界面,一键发布。同时引入缓存和锁队列机制,解决峰值访问性能问题。动态配置界面,一键生成rpc服务:动态配置报告:4未来规划目前的架构还存在一定程度的不可能三角,需要探索更多的架构可能性:(1)使用writeinholo,calculateinmc为了避免内存数据库如holo,在极端查询中内存被炸毁的问题,可以利用mc的计算能力来解决一些事实表的join问题,提高一些灵活性。(2)借助apachehudi推动湖仓融合,hudi统一批流存储,flink统一批流计算,一套代码提供5-10级准实时架构分钟,缓解一些场景只需要在时间上降低实时计算成本。
