当前位置: 首页 > 科技观察

基于Antlr在ApacheFlink中实现监控规则DSL的探索实践

时间:2023-03-13 20:31:23 科技观察

【.com原稿】1简介应对复杂多变的业务指标监控不够灵活。为此,苏宁数据云在流计算框架ApacheFlink之上,设计了一套包括ETL、指标计算、告警触发、告警通知模块在内的业务监控引擎。其基础规则为报警SQLDSL语言,定义了完整易用的场景规则语法和运算符,支持动态更新调整,方便业务方接入。下面介绍基本原理和实现,供同行参考和指正。2DSL规则设计前期调研各业务方需求,根据维度和时间、同期对比、同期对比,或者当方差超过阈值范围,会触发告警,业务方访问的是原始明细数据,因此设计规则需要包含数据清洗过滤、分组、聚合计算、时间窗设置等特性。SQL语言是开发者最熟悉的数据处理语言,选择它作为原型可以节省理解和交流的成本。语法规则如下:SELECT{metrics}FROM{metricFilters}WHERE{alertConditions}GROUPBY{groupByExpr}ORDERBY{orderByExpr}FORLAST{number}MINUTESELECT子句度量计算FROM子句数据过滤表达式WHERE子句告警状态判断表达式GROUPBY子句数据分组ORDERBY子句排序规则FORLAST子句计算窗口时间当前聚合函数支持平均(avg)、计数(count)、合计(sum)、去重计数(distinct_count)等指标计算,也支持多-index四算术混合运算、位运算、比较运算、逻辑运算。以下SQL规则表达的业务含义是根据错误类型和城市信息进行分组。如果5分钟内的错误数大于100且前5分钟的错误数大于20%,则触发告警:SELECTCOUNT(errorType,5)AScurrentTypeNum,COUNT(errorType,10,5)ASpreviewTypeNum,(currentTypeNum-previewTypeNum)/previewTypeNumAScircularPercent,errorType,cityFROMmobileAppIdIN("Suning_PCbrowser","Suning_WAP")WHEREcurrentTypeNum>100ANDcircularPercent>0.2GROUPBYerrorType,cityORDERBYEVENT_TIMEFORLAST10MINUTE3实现设计3.1SQL解析基于Antlr定义词法和语法,使用MavenAntlr插件生成ASTparser,Visitormode遍历语法树实现各种partprocessor,不熟悉的同学可以参考Antlr官方手册,这里不再赘述。解析SQL语句的过程如下:提取计算中用到的所有字段和过滤条件(FROM语句),作为ETL过程中的预处理器,将WHERE子句抽象为布尔表达式,抽象出系统状态判断条件GROUPBY子句变成了从数据对象中提取分组键的处理器。时间窗口字段是从ORDERBY子句中提取的,即时间窗口大小是从ApacheFlink中的EventTime或ProcessTimeFORLAST子句中提取的。SELECT语句是一组支持AS别名的运算符。执行后的结果是一个键值对,即业务指标。需要注意的是,考虑到ETL过程和计算过程是独立的模块,如果计算过程通过字段名引用字段值,中间对象必须是键值对形式的Map结构。序列化和反序列化必然会对性能产生一定的影响。为此,在遍历抽象语法树的过程中必须构建解析上下文,将字段名引用改为数组下标。ETL过程生成的中间数据对象是数组结构,计算时访问字段值的时间复杂度为O(1)。比如原始数据为:{"errorType":"E005","mobileAppId":"Suning_WAP","city":"025","network":"CMCC"}解析出子字段数组["errorType","city"]发送ETL模块,处理后的数据对象Row为["E005","025"],实际运行中算子COUNT("errorType")为COUNT(ROW[0])。3.2整体架构全程ETL和告警计算模块全部运行在ApacheFlink中,借鉴了Flink的实时计算和状态持久化能力。各个业务接入方的数据格式不同。将相关的数据分析、清洗、过滤、富集等功能分离成ETL模块,可根据接入方的业务需求进行定制化部署。SQL中的FROM阶段被提前到ETL模块,提取过滤掉不需要的数据,减少数据传输量。3.3FlinkETL模块告警规则中SQL引擎运行机制SourceStream转换为广播流BroadcastStream连接到数据解析Stream。由于并行度不一致,没有分组,需要使用BroadcastProcessFunction来处理规则变更通知,将SQL规则解析成字段提取和过滤处理器保存在BroadcastState中,以达到数据流和广播流共享状态的目的。告警计算模块从Kafka中消费ETL模块提取的字段和规则ID,根据告警规则的Group子句提取分组信息;结合规则流分析Select子句的表达式计算所有指标;并结合规则流程再次分析Where子句告警触发条件与状态机相关的参数,确定系统当前的健康状态,触发健康状态机的转移;将系统状态转移和当前指标值作为告警事件写入Kakfa,告警通知模块根据通知规则配置处理相关发送逻辑。3.4聚合计算ApacheFlink是一个流式实时处理框架,聚合计算类似于批处理,需要计算窗口内的所有数据;对应的告警SQL可能会产生大量的分组,每个分组其实就是一个独立的告警规则,如果缓存一个事件窗口的数据进行计算,会给Flink的状态维护带来巨大的压力,所以分钟级的桶计算和累加器聚合计算采用聚合结果设计思想。Bucketing根据时间窗口的长度创建一个循环队列,每分钟一个bucket,根据时间计算当前元素bucket的位置,计算当前bucket的值。累加器类似于Spark和Flink中累加器的实现。它保存分钟级计算的中间结果,合并累加器,得到最终值。因此,在计算过程中不需要保留原始数据,只需要在Flink中保存累加器的状态即可。/***创建新的累加器,开始新的聚合。*/ADDcreateAccumulator();/***将给定的输入值添加到给定的累加器,返回*新的累加器值。*/ADDadd(ELEvalue,ADDaccumulator);/***从累加器中获取聚合结果。Mergesttwoaccumulators,返回一个accumulatorwiththemergedstate.*/ADDmerge(ADDa,ADDb);}去重计数distinct_count聚合计算在告警场景存在精度损失,所以使用支持bucket累加的HyperLogLog算法来降低内存需求。3.5告警状态机是否产生告警事件不是根据计算出的指标,而是根据Where子句判断指标是否超过阈值,返回True或False来判断当前系统健康状态,比较健康状态转换当前状态产生的事件作为触发告警事件。目前设计系统存在三种状态,分别是正常(Normal)、警告(Warning)和临界(Critical),后两种状态分别对应两种SQLDSL中的Where子句表达式。状态迁移图如下:业务系统是否产生告警?也可以根据这些类型的事件配置通知。如果异常持续5分钟后产生告警,则只能关注CONTINUE相关事件。当然,告警通知系统还有通知合并、告警风暴抑制等相关功能。4问题与展望目前,该引擎已应用于苏宁易购登录、商品详情页、购物车、支付等多条业务线的用户体验监控,帮助产品和业务经营者快速发现和定位问题。为了提高处理能力,降低访问难度,需要优化以下两个方面:完善ApacheFlink状态管理Flink集群状态管理目前使用的FsStateBackend机制,状态存储在HDFS文件系统中,生成在高基数维度或多维组合的海量数据包业务场景下,可能存在OOM和性能风险。下一步是切换到支持增量检查点的RocksDB状态存储方案。智能异常检测现有的告警规则判断系统状态是否异常或基于静态阈值。这种方式需要业务方对系统指标有准确的认识,并且需要随着业务的变化不断调整,不符合行业的发展趋势。下一阶段计划针对没有历史数据的新业务场景使用曲线波动检测3sigma和中值绝对偏差(MAD)算法,针对相对稳定的业务线引入时间序列异常检测机器学习。苏宁数据云服务产品苏宁数据云拥有大数据开发包(提供全面的大数据开发服务)、人工智能服务(实现智能人机交互,做出更好的决策)、数据分析展示服务(提供海量数据处理分析)方法)、平台基础服务(提供大数据平台基础服务)、实时告警数据分析平台也在规划建设中。实时计算中监控规则DSL的实现是内部项目的技术探索。如果能通过实际业务的考验,也会考虑通过数据云提供服务。作者简介黄小虎,苏宁科技集团消费平台购物流程架构负责人,全面负责苏宁易购商品详情页、购物车、大派对等核心系统的优化,保障的大促销。对电子商务交易流程和业务有深入的思考和研究,专注于高并发大型电子商务网站的架构设计和高可用系统设计。曾主导和参与Commerce系统拆分、商品详情页访问层优化、云信客服系统重构等重大技术攻关项目。现致力于打造苏宁易购新一代核心购物流程体系,希望让购物体验更上一层楼。胡正林,苏宁科技集团消费平台高级架构师,拥有十余年软件开发经验,熟悉大型分布式高并发系统的架构和开发。【原创稿件,合作网站转载请注明原作者和出处为.com】