在上一篇文章中,我们简单介绍了FlinkCEP。在本文中,我们将通过代码演示FlinkCEPSQL中的严格邻居效应:(1)pom依赖:org.apache.flinkflink-cep_${scala.binary.version}${flink.version}(2)定义一个消息对象publicstaticclassTicker{publiclongid;publicStringsymbol;publiclongprice;publiclongtax;publicLocalDateTimerowtime;publicTicker(){}publicTicker(longid,Stringsymbol,longprice,longitem,LocalDateTimerowtime){this.id=id;this.symbol=符号;这个。价格=价格;this.tax=tax;this.rowtime=rowtime;}}(3)构造数据,定义事件组合设置=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();tEnv=StreamTableEnvironment.create(env,settings);System.out.println("===============CEP_SQL_9=================");finalDateTimeFormatterdateTimeFormatter=DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss");DataStreamdataStream=env.fromElements(newTicker(1,"ACME",22,1,LocalDateTime.parse("2021-12-1010:00:00",dateTimeFormatter)),newTicker(3,"ACME",19,1,LocalDateTime.parse("2021-12-1010:00:02",dateTimeFormatter)),newTicker(4,"ACME",23,3,LocalDateTime.parse("2021-12-1010:00:03",dateTimeFormatter)),newTicker(5,"Apple",25,2,LocalDateTime.parse("2021-12-1010:00:04",dateTimeFormatter)),newTicker(6,"苹果",18,1,LocalDateTime.parse("2021-12-1010:00:05",dateTimeFormatter)),newTicker(7,"Apple",16,1,LocalDateTime.parse("2021-12-1010:00:06",dateTimeFormatter)),newTicker(8,"Apple",14,2,LocalDateTime.parse("2021-12-1010:00:07",dateTimeFormatter)),newTicker(9,"Apple",15,2,LocalDateTime.parse("2021-12-1010:00:08",dateTimeFormatter)),newTicker(10,"Apple",25,2,LocalDateTime.parse("2021-12-1010:00:09",dateTimeFormatter)),newTicker(11,"Apple",22,1,LocalDateTime.parse("2021-12-1010:00:11",dateTimeFormatter)),newTicker(12,"苹果",15,1,LocalDateTime.parse("2021-12-1010:00:12",dateTimeFormatter)),newTicker(13,"Apple",19,1,LocalDateTime.parse("2021-12-1010:00:13",dateTimeFormatter)),newTicker(14,"Apple",25,1,本地日期Time.parse("2021-12-1010:00:14",dateTimeFormatter)),newTicker(15,"Apple",19,1,LocalDateTime.parse("2021-12-1010:00:15",dateTimeFormatter)),newTicker(16,"Apple",15,1,LocalDateTime.parse("2021-12-1010:00:16",dateTimeFormatter)),newTicker(17,"Apple",19,1、LocalDateTime.parse("2021-12-1010:00:17",dateTimeFormatter)),newTicker(18,"Apple",15,1,LocalDateTime.parse("2021-12-1010:00:18",dateTimeFormatter)));表table=tEnv.fromDataStream(dataStream,Schema.newBuilder().column("id",DataTypes.BIGINT()).column("symbol",DataTypes.STRING()).column("price",DataTypes.BIGINT()).column("tax",DataTypes.BIGINT()).column("rowtime",DataTypes.TIMESTAMP(3)).watermark("rowtime","rowtime-INTERVAL'1'SECOND").build());tEnv.createTemporaryView("CEP_SQL_9",table);Stringsql="SELECT*"+"FROMCEP_SQL_9"+"MATCH_RECOGNIZE("+"分区BYsymbol"+//按符号分区,将相同卡号的数据分到同一个计算节点"ORDERBYrowtime"+//对窗口内的事件时间进行排序。"MEASURES"+//定义如何根据成功匹配的输入事件构造输出事件"e1.idasid,"+"AVG(e1.price)asavgPrice,"+"e1.rowtimeASstart_tstamp,"+"e3.rowtimeASend_tstamp"+"ONEROWPERMATCH"+//输出匹配成功"AFTERMATCHskiptonextrow"+//匹配后跳转到下一行"PATTERN(e1e2e3)WITHININTERVAL'2'MINUTE"+"DEFINE"+//定义每个事件的匹配条件"e1AS"+"e1.price=25,"+"e2AS"+"e2.price>10,"+"e3AS"+"e3.price=15"+")MR";TableResultres=tEnv.executeSql(sql);res.打印();tEnv.dropTemporaryView("CEP_SQL_9");}catch(Exceptione){LOG.error(e.getMessage(),e);}}(4)关键代码解释:输出两分钟内匹配到的数据,输出信息:(5)执行效果:从数据集中匹配出两组符合要求的数据