本文转载请联系五分钟学习大数据公众号。我们在看直播的时候,不管是主播还是用户,一个很重要的项目就是弹幕文化。为了增加直播的趣味性和互动性,各大网络直播平台纷纷采用弹窗作为用户之间实时交流的方式。丰富多样的弹幕数据隐含着复杂的用户属性和用户行为。了解在线直播平台用户在弹幕内容审核监控、舆情热点预测、个性化摘要标注等多个方面具有应用价值。本文不分析弹幕数据的应用价值,仅通过弹幕内容审核和监控案例来理解FlinkCEP的概念和作用。当用户发弹幕时,直播平台主要实时监控识别两类弹幕内容:一是发不友好弹幕的用户;另一个是滑动屏幕的用户。我们先记住上面两类需要实时监控和识别的用户,然后介绍FlinkCEP的API,然后使用CEP来解决上面的问题。什么是FlinkCEPFlinkCEP?FlinkCEP是一个基于Flink的复杂事件处理库。它可以从多个数据流中发现复杂事件,识别有意义的事件(例如机会或威胁),并尽快做出响应,而不是需要等待数天或数月的相当长的时间才能发现问题。FlinkCEPAPI的核心是PatternAPI,可以让你快速定义复杂的事件模式。每个模式都包含多个阶段(stage)或者我们也可以称之为状态(state)。要从一种状态切换到另一种状态,用户可以指定条件,这些条件可以应用于相邻事件或独立事件。在介绍API之前,先了解几个概念:1.模式和模式序列简单的模式称为模式,复杂的模式序列最终在数据流中被搜索匹配的称为模式序列。每个复杂模式序列由多个简单模式组成。图案组成。匹配是一系列输入事件,通过一系列高效的模式转换,可以访问复杂模式图中的所有模式。每个模式都必须有一个唯一的名称,我们可以使用模式名称来标识该模式匹配的事件。2.单一模式模式可以是单例模式,也可以是循环模式。单例模式接受单个事件,循环模式可以接受多个事件。3、模式示例:有以下模式:ab+c?d其中字母a、b、c、d代表模式,+代表循环,b+为循环模式;?代表可选,c?是可选的;所以上面模式的意思是:a后面可以跟着一个或多个b,接着是可选的c,最后是d。其中a、c?、d为单例模式,b+为循环模式。一般来说,模式是单例模式,可以通过使用量词(Quantifiers)转换成循环模式。每个模式可以有一个或多个条件,这些条件是根据事件接收定义的。换句话说,每个模式匹配并接收具有一个或多个条件的事件。了解了以上概念之后,我们来介绍一下案例中需要用到的几个CEPAPI:案例中用到的CEPAPI:Begin:定义一个起始模式状态Usage:start=Pattern.<:Event>begin("start");Next:附加一个新的模式状态。匹配事件必须直接跟在前一个匹配事件之后用法:next=start.next("next");其中:定义当前模式状态的过滤条件。事件只有通过过滤器才能匹配状态用法:patternState.where(_.message=="TMD");Within:定义事件序列匹配模式的最大时间间隔。如果不完整的事件序列超过这个时间,则被丢弃用法:patternState.within(Time.seconds(10));次数:给定类型的事件已发生指定次数用法:patternState.times(5);API先介绍以上内容,接下来我们来解决文章开头提到的案例:监控用户弹幕行为案例一:监控恶意用户规则:如果用户在10秒内输入TMD超过5次,则认为该用户被恶意攻击,识别用户。使用FlinkCEP检测恶意用户:importorg.apache.flink.api.scala._importorg.apache.flink.cep.PatternSelectFunctionimportorg.apache.flink.cep.scala.{CEP,PatternStream}importorg.apache.flink.cep.scala.pattern.Patternimportorg.apache.flink.streaming.api.TimeCharacteristicimportorg.apache.flink.streaming.api.scala.{DataStream,OutputTag,StreamExecutionEnvironment}importorg.apache.flink.streaming.api.windowing.time.TimeobjectBarrageBehavior01{caseclassLoginEvent(:String,message:String,timestamp:Long){overridedeftoString:String=userId}defmain(args:Array[String]):Unit={valenv=StreamExecutionEnvironment.getExecutionEnvironment//使用IngestionTime作为EventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//用于观察测试数据处理顺序env.setParallelism(1)//模拟数据源valloginEventStream:DataStream[LoginEvent]=env.fromCollection(List(LoginEvent("1","TMD",1618498576),LoginEvent("1""TMD",1618498577),LoginEvent("1","TMD",1618498579),LoginEvent("1","TMD",1618498582),LoginEvent("2","TMD",1618498583),LoginEvent("1","TMD",1618498585))).assignAscendingTimestamps(_.timestamp*1000)//定义模式valloginEventPattern:Pattern[LoginEvent,LoginEvent]=Pattern.begin[LoginEvent]("begin").where(_.message=="TMD").times(5).within(Time.seconds(10))//匹配模式valpatternStream:PatternStream[LoginEvent]=CEP.pattern(loginEventStream.keyBy(_.userId),loginEventPattern)importscala.collection.Mapvalresult=patternStream.select((pattern:Map[String,Iterable[LoginEvent]])=>{valfirst=pattern.getOrElse("begin",null).iterator.next()(first.userId,first.timestamp)})//恶意用户,实际处理可以被用户禁止话等处理,这里为了简化只打印出用户result.print("恶意用户>>>")env.execute("BarrageBehavior01")}}例2:监控刷屏用户规则:如果用户是10s内,同时连续输入同一句话5次以上,会被视为恶意刷屏使用FlinkCEP检测刷屏用户objectBarrageBehavior02{caseclassMessage(userId:String,ip:String,msg:String)defmain(args:Array[String]):Unit={//初始化运行环境valenv=StreamExecutionEnvironment.getExecutionEnvironment//设置并行度env.setParallelism(1)//模拟数据源valloginEventStream:DataStream[Message]=env.fromCollection(List(Message("1","192.168.0.1","北京"),Message("1","192.168.0.2","北京"),Message("1","192.168.0.3","北京"),Message("1","192.168.0.4","北京"),Message("2","192.168.10.10","上海"),Message("3","192.168.10.10","北京"),Message("3","192.168.10.11","北京"),Message("4","192.168.10.10","北京"),Message("5","192.168.10.11","上海"),Message("4","192.168.10.12","北京"),Message("5","192.168.10.13","上海"),Message("5","192.168.10.14","上海"),Message("5","192.168.10.15","北京"),Message("6","192.168.10.16","北京"),Message("6","192.168.10.17","北京"),Message("6","192.168.10.18","北京"),Message("5","192.168.10.18","上海"),Message("6","192.168.10.19","北京"),Message("6","192.168.10.19","beijing"),Message("5","192.168.10.18","shanghai")))//定义patternvalloginbeijingPattern=Pattern.begin[Message]("start").where(_.msg!=null)//一次登录失败.times(5).optional//配对打印满足5次的数据.within(Time.seconds(10))//进行分组匹配valloginbeijingDataPattern=CEP.pattern(loginEventStream.keyBy(_.userId),loginbeijingPattern)//查找符合规则的数据选项[Iterable[Message]]=nullloginEventList=pattern.get("start")match{caseSome(value)=>{if(value.toList.map(x=>(x.userId,x.msg))).distinct.size==1){Some(value)}else{None}}}loginEventList})//打印测试loginbeijingResult.filter(x=>x!=None).map(x=>{xmatch{caseSome(value)=>value}}).print()env.execute("BarrageBehavior02)}}FlinkCEPAPI除了案例中介绍的几个API,我们还会介绍其他常用的API:1.conditionalAPI用于允许传入事件要被modalAccepted,为mode指定传入事件必须满足的条件,这些条件由事件本身的属性或者之前匹配的事件的属性统计等来设置。比如某个值为事件大于5,or大于先前接受的事件的某个值的平均值。您可以使用pattern.where()、pattern.or()、pattern.until()方法来指定条件。条件可以是IterativeConditions或SimpleConditions。FlinkCEP支持事件之间的三个相邻条件:next():严格满足条件示例:模式为begin("first").where(_.name='a').next("second").where(.name='b')当且仅当数据为a,b时,模式才会命中。如果数据是a,c,b,由于a后面跟着c,所以a会直接丢弃,不会命中pattern。followedBy():松散满足示例:当且仅当Thedata是a,b或者a,c,b,命中pattern,中间的c会被忽略。followedByAny():松散满足条件的非确定性示例:模式是begin("first").where(_.name='a').followedByAny("second").where(.name='b')ifandOnly当数据为a,c,b,b??时,对于followedBy方式,命中为{a,b},对于followedByAny,会有两次命中{a,b},{a,b}.2.QuantifiersAPI还记得我们在上面解释模式概念的时候说过的话吗:一般来说,模式都是单例模式,可以使用量词将其转化为循环模式。这里的量词指的是量词API。以下量词API可以将模式指定为循环模式:pattern.oneOrMore():给定事件有一次或多次出现,例如上面提到的b+。pattern.times(#ofTimes):给定类型的事件发生指定次数,例如4次。pattern.times(#fromTimes,#toTimes):给定类型事件的出现次数在规定的次数内,例如2~4次。您可以使用pattern.greedy()方法使模式循环,但不能使一组模式循环。贪婪:尽可能重复。使用pattern.optional()方法使循环模式可选,即它可以是循环模式或单个模式。3、匹配后跳过策略所谓匹配跳过策略,就是对多个匹配成功的模式进行过滤。也就是说,如果多次匹配成功,我可能就不需要那么多了。根据匹配策略,过滤即可。Flink中有五种跳过策略:NO_SKIP:不过滤,所有可能匹配的都会被触发。SKIP_TO_NEXT:丢弃与第一个匹配的事件相同的事件,发出第一个匹配的事件,即直接跳到下一个模式匹配的事件,以此类推。SKIP_PAST_LAST_EVENT:丢弃比赛开始后结束前匹配的事件。SKIP_TO_FIRST[PatternName]:丢弃匹配开始之后但在PatternName模式匹配的第一个事件之前匹配的事件。SKIP_TO_LAST[PatternName]:丢弃在匹配开始之后但在PatternName模式匹配的最后一个事件之前匹配的事件。如何理解上面的策略,我们以NO_SKIP和SKIP_PAST_LAST_EVENT为例进行说明:模式中:begin("start").where(_.name='a').oneOrMore().followedBy("second")。where(_.name='b'),我们输入数据:a,a,a,a,b,如果是NO_SKIP策略,即没有过滤策略,则模式匹配为:{a,b},{a,a,b},{a,a,a,b},{a,a,a,a,b};如果是SKIP_PAST_LAST_EVENT策略,即匹配开始后匹配结束前的事件被丢弃,匹配的模式为:{a,a,a,a,b}。FlinkCEP使用场景除了以上案例场景,FlinkCEP还广泛应用于网络欺诈、故障检测、风险规避、智能营销等领域。1、实时防作弊和风控对于电商来说必不可少。拼多多曾在国内推出100元无门槛优惠券,当晚就上百亿。没有及时做好风控工作。此外,商家在上架商品时,往往通过修改商品名称和滥用标题来提高搜索关键词排名,批量注册一批机器账号快速刷单增加商品销量等。作弊行为。各种作弊技术也需要不断制定规则来匹配这种行为。2、实时营销分析用户在手机APP中的实时行为,统计用户的活动周期,通过用户画像进行用户推荐。例如,用户登录APP后1分钟内只浏览了商品,没有下单;用户浏览商品后,3分钟内查看其他同类商品进行价格比较;用户下单后1分钟内是否支付了商品款项。如果能够很好地利用这些数据,那么就可以将用户浏览过的同类产品推荐给用户,可以大大提高购买率。3、网络攻击实时检测当前互联网安全形势依然严峻,网络攻击事件普遍且形式多样。这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量作为判断攻击的依据。对网络潜在攻击进行实时检测和预警。云服务厂商的多个数据中心会定期向监控中心报告他们的瞬时流量。如果流量在预设的正常范围内,则认为正常,不会采取任何措施。手术;数据中心在10秒内连续5次报告流量超过正常范围阈值,则触发告警事件;如果数据中心在30秒内连续30次报告流量超过正常范围的阈值,则触发严重警报。FlinkCEP的原理简单介绍一下,ApacheFlink在实现CEP时借鉴了EfficientPatternMatchingoverEventStreams论文中的NFA模型。在本文中,还提到了一些优化。这里略过,只说NFA。概念。文中提到NFA,即Non-determinedFiniteAutomaton,称为不确定有限状态机,意思是状态是有限的,但每个状态都可能转化为多个状态(不确定)。非确定性有限自动状态机:首先介绍两个概念:状态:状态分为三类,初始状态、中间状态和最终状态。转换:take/ignore/proceed都是转换的名称。在NFA匹配规则中,本质上是一个状态转换过程。三种转换的含义如下:取:主要是条件的判断。当传递一条数据进行判断时,一旦满足条件,则获取当前元素,放入结果集中,然后将当前状态转移到下一个状态。Proceed:可以不依赖任何事件,将当前状态转移到下一个状态,比如透传的意思。Ignore:当一条数据到达时,可以忽略消息事件,保持当前状态不变,相当于转到自己的一个状态。NFA的特点:在NFA中,给定当前状态,可能有多个下一个状态。可以随机或并行(同时)选择下一个状态。输入符号可以为空。规则引擎RuleEngine:将业务决策与应用程序代码分离,使用预定义的语义模块编写业务决策。接受数据输入,解释业务规则,并根据业务规则做出业务决策。使用规则引擎可以通过降低实现复杂业务逻辑的组件的复杂性来降低应用程序的维护和可扩展性成本。1、DroolsDrools是一个用Java编写的开源规则引擎,通常用来解决业务代码和业务规则的分离。其内置的DroolsFusion模块也提供了CEP功能。优点:功能比较齐全,具备系统监控、运营平台等功能。规则支持动态更新。缺点:时间窗功能是通过内存实现的,不能支持更长跨度的时间窗。无法有效支持定时触摸(比如用户浏览一段时间后到达条件判断)。2、AviatorAviator是一个用Java语言实现的高性能、轻量级的表达式求值引擎,主要用于各种表达式的动态求值。优点:支持大多数算术运算符。支持函数调用和自定义函数。支持正则表达式匹配。支持传入变量,性能优异。缺点:没有ifelse、dowhile等语句,没有赋值语句,没有按位运算符。3.EasyRulesEasyRules是一个集成了MVEL和SpEL表达式的轻量级规则引擎。优点:轻量级框架,学习成本低。基于POJO。为定义业务引擎提供有用的抽象和简单的应用程序。支持从简单规则到复杂规则。4.EsperEsper被设计为CEP的轻量级解决方案,可以方便的嵌入到服务中,提供CEP功能。优点:轻量级可嵌入开发,常用CEP功能简单易用。EPL语法类似于SQL,学习成本低。缺点:单机全内存方案需要整合其他分布和存储。用内存实现时间窗函数不能支持更长跨度的时间窗。无法有效支持定时触摸(比如用户浏览一段时间后到达条件判断)。5.FlinkCEPFlink是一个高吞吐量和低延迟的流式系统。FlinkCEP是一个非常通用且易于使用的实时流式事件处理解决方案。优点:继承了Flink的高吞吐特性。事件存储在外部,可以支持更长的跨度时间窗口。可以支持定时触摸(通过followedBy+PartternTimeoutFunction实现)。
