当前位置: 首页 > 后端技术 > Java

Flink在风控场景下实现了实时特性

时间:2023-04-01 14:35:44 Java

背景介绍风控简介21世纪,随着信息时代的到来,互联网行业的发展速度远超其他行业。一旦商业模式运作良好并盈利,资本立即蜂拥而入,促使更多企业不断进入市场进行快速复制迭代,力图成为下一个“行业龙头”。带着资金入市的玩家只会因为没有资金压力而更加注重业务发展,而忽略了业务中的风险点。强大如拼多多,却被“薅羊毛”大军光顾,损失几千万。风险控制,即风险管理,是一个管理过程,包括对风险的定义、衡量、评估和应对风险的策略。目的是尽量减少可避免的风险、成本和损失[1]。特色平台介绍互联网企业无时无刻不面临着各种黑灰攻击。业务安全团队需要提前评估业务流程中存在风险的地方,然后设置检查点收集相关业务信息,识别当前请求是否存在风险。专家经验(防控策略)是在长期对抗中产生的。策略的部署需要每个特性的支持,那么什么是特性呢?特征分为基础特征、衍生特征、统计特征等,举例如下:地址,卖家地址等:需二次计算,如买家到买家的距离,手机号码前3位等统计特性:需实时统计,如购买数量某手机号5分钟内下单、10分钟内购买金额大于2万元的订单数等。随着业务的快速发展,单纯的专家经验已经不能满足风险识别的需求。算法团队的加入,让拦截效果更加精准。通过统一算法工程框架,算法部工作人员解决了模型和特征迭代的系统性问题,大大提高了迭代效率。根据功能不同,算法平台可分为模型服务、模型训练和特征平台三个部分。其中,模型服务用于提供在线模型估计,模型训练用于提供模型训练输出,特征平台为特征和样本提供数据支持。本文将重点介绍特征平台在构建过程中实时计算遇到的挑战和优化思路。解决方案面临的挑战和挑战在业务发展初期,我们可以通过硬编码的方式满足策略师提出的特性需求,协同性比较好。但是,随着业务发展越来越快,业务线越来越多,营销方式越来越复杂,用户和请求量呈指数级增长。适用于早期的硬编码方式,存在策略分散、策略难以管理、逻辑与业务强耦合、策略更新迭代率发展受限、对接成本高等诸多问题。这时候,我们迫切需要一个可在线配置、可热更新、可快速试错的特性管理平台。老框架的不足实时框架1.0:基于FlinkDataStreamAPI构建如果你熟悉FlinkDataStreamAPI,你一定会发现Flink的设计自然满足风控的实时特征计算场景。我们只需要几个简单的步骤就可以统计指标,如下图所示:FlinkDataStream流图实时特征统计示例代码如下://数据流,如topicDataStreamdataStream=...SingleOutputStreamOperatorwindowOperator=dataStream//filter.filter(this::filterStrategy)//数据转换.flatMap(this::convertData)//配置水印.assignTimestampsAndWatermarks(timestampAndWatermarkAssigner(config))//Grouping.keyBy(this::keyByStrategy)//5分钟滚动window.window(TumblingEventTimeWindows.of(Time.seconds(300)))//自定义聚合函数,内部逻辑自定义.aggregate(AllDecisionAnalyzeCountAgg.create(),AllDecisionAnalyzeWindowFunction.create());1.0框架不足:特征对开发者编码依赖性强,可以抽象出简单的统计特征,复杂一点就需要自定义迭代,效率低。战略需求、产品排期、研发介入、测试保障、交付一套流程至少两周。特性强耦合,任务拆分困难,一个JOB包含太多逻辑,也许新的功能逻辑会总的来说,1.0适用于业务的起步阶段,但随着业务的发展,研发速度逐渐成为瓶颈,不符合可持续、可管理的实时特征清洗架构。实时框架2.0:基于FlinkSQL构建1.0架构的缺点在于从需求到研发使用不同的语言系统。如何高效转化需求,甚至直接让战略人员配置特征清洗逻辑直接上线?如果按照两周一周的迭代速度,可能会被线上黑灰生产“认不出来”。此时,我们的研发团队注意到FlinkSQL,SQL是最常用的数据分析语言,是打分、策略、运维的基本必备技能。可以说,SQL是转换需求成本最低的实现方式之一。看一个FlinkSQL的实现例子:--错误日志监控--kafkasourceCREATETABLErcp_server_log(threadvarchar,levelvarchar,loggerNamevarchar,messagevarchar,endOfBatchvarchar,loggerFqcnvarchar,instantvarchar,threadIdvarchar,threadPriorityvarchar,appNamevarchar,triggerTime作为LOCALTIMESTAMP,proctime作为PROCTIME(),WATERMARKFORtriggerTimeAStriggerTime-INTERVAL'5'SECOND)WITH('connector.type'='kafka','connector.version'='0.11','connector.topic'='${sinkTopic}','connector.startup-mode'='latest-offset','connector.properties.group.id'='streaming-metric','connector.properties.bootstrap.servers'='${sinkBootstrapServers}','connector.properties.zookeeper.connect'='${sinkZookeeperConnect}}','update-mode'='append','format.type'='json');--sink_feature_indicator的创建这里省略,参考源表--Decisiondistributionbydayandcitybybusinessline插入到sink_feature_indicatorSELECTlevel,ggerName,COUNT(*)FROMrcp_server_logWHERE(level<>'INFO'AND`appName`<>'AppTestService')或loggerName<>'com.test'GROUPBYTUMBLE(triggerTime,INTERVAL'5'SECOND),level,loggerName;在开发FlinkSQL支持平台的过程中,我们遇到了如下问题:如果对一个SQL索引进行清洗,数据源会极大地浪费SQL合并,即如果一个检测与同源SQL进行合并,会大大增加了作业的复杂度,而且无法定义边界。SQL上线时需要关机重启。如果任务中包含大量稳定指标,会不会是临界点?用线下数据分析线上是否存在风险。针对风险场景,设计防控策略。透传到研发端,其实就是实时特性一个一个的开发。因此,完全决定了实时特性的在线速度、质量交付和易用性。能否及时对接线上风险场景,是关键。在构建统一的实时特征计算平台之前,实时特征的输出主要存在以下问题:交付慢、迭代开发:从策略提出到产品,再到研发、测试,是否在线观察稳定,速度极慢,耦合性强,调动全身:怪物任务,包括很多业务特性,所有业务混合在一起,没有优先级保证重复开发:因为没有统一的真实-当时的特性管理平台,很多特性已经存在,只是名称不同,造成极度浪费的平台建设,最重要的是“整个过程的抽象”。平台的目标应该是好用、好用、好用。基于以上思路,我们尝试提炼出实时特征开发的痛点:模板化+配置,即平台提供实时特征创建模板,用户可以基于模板生成自己想要的实时特征需要通过简单的配置。Flink实时计算架构图计算层数据源清洗:不同数据源抽象FlinkConnector,标准输出供下游使用数据拆分:1拆分N,一条实时消息可能包含多条消息,此时需要进行数据裂变动态配置:允许在不间断JOB的情况下,动态更新或添加清洗逻辑,对特征相关的清洗逻辑发送脚本加载:Groovy支持,热更新RTC:Real-TimeCalculate,实时特征计算,高度抽象封装模块任务感知:基于特征业务领域,优先级,稳定性,任务隔离,业务解耦服务层统一查询SDK:实时特征统一查询SDK,屏蔽底层实现逻辑基于统一的Flink实时计算架构,我们重新设计了实时特征清洗架构Flink实时计算数据流图特征配置&存储/读取特征底层存储应该是“原子的”,即最小的不可分割单元。为什么要这样设计?实时统计功能与窗口大小相关联。不同的人员防控策略对特征窗口的大小有不同的要求。示例如下:可信设备判定场景:当前手机号登录时间窗口适中,不宜过短,防止干扰提现欺诈判定场景:当前手机号登录时间窗口尽可能短,而近距离快速取现可以结合其他维度快速定位风险基于以上,急需一套通用的实时特征读取模块,满足策略师对任意窗口的需求,同时时间满足研发人员的快速配置和清洁需求。我们重构后的特征配置模块如下:特征配置抽象模块实时特征模块:特征唯一标识特征名称支持窗口:滑动、滚动、固定大小窗口事件切片单位:分、时、日、周主属性:groupingColumns可以有多个属性:聚合函数,比如去重需要的基本特征的输入。业务留给风控的时间不多了。大多数场景都在100ms以内,实时特征获取更短。从以往的研发经验来看,RT需要控制在10ms以内,才能保证策略执行不会超时。所以我们的存储使用Redis来保证性能不是瓶颈。清洗脚本热部署如前所述,实时特征计算模块强烈依赖于上游消息中传递的“主属性”和“从属属性”。这个阶段也是研发需要介入的阶段。如果消息中的主要属性字段不存在,则需要研发。完成,这个时候你要加上代码发布,又会回到原阶段面临的问题:FlinkJob需要不断重启,这显然是不能接受的。这时我们想到了Groovy,Flink+Groovy可以直接热部署代码吗?答案是肯定的!因为我们抽象了整个FlinkJob的计算流图,算子本身不需要改变,即DAG是固定的,成为算子内部关联事件的清洗逻辑。因此,只要改变关联的清洗逻辑和清洗代码本身,就不需要重启FlinkJob就可以完成热部署。Groovy热部署的核心逻辑如图所示:清洗脚本配置加载图开发或策略人员在管理后台(OperatingSystem)添加清洗脚本并存入数据库。FlinkJob脚本缓存模块会感知此时脚本的添加或修改(如何感知请看下面的整体流程)。预热:脚本第一次运行比较耗时,在第一次启动或者更新缓存的时候会提前预热执行,保证真正的流量进入脚本快速执行缓存:cache已经在好的Groovy脚本中Push/Poll:缓存更新采用push和pull两种方式保证信息不会丢失router:脚本路由,保证消息能找到对应的脚本并执行脚本加载核心代码://缓存,否则无限加载会metaspaceoutOfMemoryprivatefinalstaticMapgroovyObjectCache=newConcurrentHashMap<>();/***加载脚本*@paramscript*@return*/publicstaticGroovyObjectbuildScript(Stringscript){if(StringUtils.isEmpty(script)){thrownewRuntimeException("scriptisempty");}StringcacheKey=DigestUtils.md5DigestAsHex(script.getBytes());如果(groovyObjectCache.containsKey(cacheKey)){日志。debug("groovyObjectCache命中");返回groovyObjectCache.get(cacheKey);}GroovyClassLoaderclassLoader=newGroovyClassLoader();尝试{ClassgroovyClass=classLoader.parseClass(scr知识产权);GroovyObjectgroovyObject=(GroovyObject)groovyClass.newInstance();类加载器.clearCache();groovyObjectCache.put(cacheKey,groovyObject);log.info("groovybuildScript成功:{}",groovyObject);catch(Exceptione){thrownewRuntimeException("buildScript错误",e);}最后{尝试{classLoader.关闭();}赶上(IOExceptione){日志。error("关闭GroovyClassLoader错误",e);}}}标准消息&清洗流程策略需要统计的消息维度非常复杂,涉及多个业务,研发本身也有监控的实时特性需求,所以实时特性对应的数据源是多种多样的。好在Flink支持多数据源的访问。对于一些特定的数据源,我们只需要继承并实现FlinkConnector就可以满足需求。我将以Kafka为例,展示整个流程是如何清洗实时统计特征的。首先介绍一下风控的整体数据流程。多个业务场景接入风控中台。风控内部核心环节为:决策引擎、规则引擎、特征服务。我们会异步记录一个业务请求决策,并发送一条Kafka消息进行实时特征计算&离线埋点。风控核心数据流图标准化消息模板Flink实时计算作业收到MQ消息后,首先要解析标准化消息模板。不同的主题对应不同的消息格式,JSON、CSV、异构(如错误日志消息、空格分隔、对象中包含一个JSON对象)等。为了便于下游运营商统一处理,标准化的消息结构为JSON结构如下:publicclassRcpPreProcessData{/***channel,可以直接写topic*/privateStringchannel;/***消息分类通道+eventCode应该是唯一的确定消息的类型*/privateStringeventCode;/***所有主从属性*/privateMapfeatureParamMap;/***原始消息*/privateObjectNode节点;}消息裂变一条“富消息”可能包含大量的业务信息,一些实时性的特征可能需要单独统计。例如业务请求风控上下文消息中包含消息是否被拒绝,即命中了多少条策略规则,命中规则是一个数组,其中可能包含多条命中规则。这时候如果要根据一条命中规则关联其他属性统计,就需要使用消息裂变,从1变为N。消息裂变的逻辑是运维后台通过Groovy脚本写的,位置清理脚本逻辑是channel(parent)+eventCode(child)。这里将逻辑分为“父子”,“父”的逻辑适用于当前通道下的所有逻辑。为了避免单独配置N个eventCode的繁琐,“sub”逻辑适用于特定的eventCode。MessageCleaning&Pruning消息的清洗就是我们需要知道特性需要哪些主从属性。有目的的清洁更清晰。定位清洗的脚本同上,依然是基于channel+eventCode实现。清洗后的主从属性存储在featureParamMap中,供下游实时计算。这里需要注意的是,我们一直将原始消息传递下去,但是如果清理的主从属性已经确认,那么原始消息就不再需要了。这时候,我们就需要“剪枝”,节省RPC调用过程I/O流量的消耗。至此,一条原始消息被处理成只包含channel(通道)、eventCode(事件类型)、featureParamMap(所有主从属性),下游算子只需要也只需要这些信息来计算。实时计算还是和上面两个算子一样。实时计算算子依靠channel+eventCode找到对应的实时特征元数据。一个事件可能有多个实时特征配置。运营平台填写实时特征配置后,会根据缓存更新机制,快速分发到任务中,根据Key构造函数生成对应的Key,下游直接传给Sink到Redis。任务问题排查&调优思路任务排查建立在综合监控的基础上。Flink为我们排查问题提供了很多有用的指标。以下是我列出的常见任务异常,希望对大家有所帮助。TaskManagerFullGC排查出现上述异常的可能原因有:大窗口:90%的TM内存突发,都是大窗口导致的内存泄漏:如果是自定义节点,涉及到缓存,容易导致对内存扩展解决方案:合理制定windowwires,合理分配TM内存(1.10默认为1G),聚合数据要由BackState管理,不建议自己写对象存储,附加堆快照排查异常,以及MAT等分析工具需要一定的Tuning经验也能快速定位问题。FlinkJob反压出现上述异常的可能原因是:数据倾斜:90%的反压肯定是数据倾斜造成的。并行度设置不当,错误估计数据流。或者单个算子的计算性能解决方案:数据清洗,参考下文。对于并行性,可以在消息传递过程中埋点,看每个节点的开销。数据倾斜核心思想:给key添加一个随机数,然后根据新的key执行keybyPartitioning会打散此时key的分布,不会造成数据倾斜问题二次keyby用于结果统计拆分逻辑核心代码:publicclassKeyByRouter{privatefinalstaticStringSPLIT_CHAR="#";/***不能太分散,否则二次聚合还是会出现数据倾斜**@paramsourceKey*@return*/publicstaticStringrandomKey(StringsourceKey){intendExclusive=(int)Math.pow(2,7);返回sourceKey+SPLIT_CHAR+(RandomUtils.nextInt(0,endExclusive)+1);}publicstaticStringrestoreKey(StringrandomKey){if(StringUtils.isEmpty(randomKey)){返回空;}返回randomKey.split(SPLIT_CHAR)[0];}}出现上述异常的可能原因是作业被挂起,状态被保存。出现上述异常的可能原因是:job本身有背压,checkpoint可能会失败。状态非常大,保存点超时。Job设置的Checkpoint超时时间比较短。结果,Savepoint还没有完成,job丢弃了Savepoint的状态。对于状态较大的作业,您可以设置一个较大的作业。如果作业不需要保持状态,只需暂停作业并重新启动即可。总结与展望本文从实时特征清洗框架、特征可配置、特征清洗逻辑热部署等出发,介绍了目前相对稳定的实时计算可行架构。经过近两年的迭代,目前的架构在稳定性、资源利用率、性能开销等方面表现最好。为业务战略人员和业务算法人员提供了强大的解决方案。支持。未来,我们期待特性的配置回归SQL。现在的配置虽然够简单,但毕竟属于我们自己的“领域设计语言”。新策略和产品人员有一定的学习成本。我们期望的是能够通过类似SQL,类似Hive离线查询的语言来配置整个领域,屏蔽底层复杂的计算逻辑,帮助业务更好的发展。欢迎关注公众号:咕咕鸡技术专栏个人技术博客:https://jifuwei.github.io/参考资料:[1]风控(https://zh.wikipedia.org/wiki/%E9%A3%8E%E9%99%A9%E7%AE%A1%E7%90%86)