当前位置: 首页 > 科技观察

进阶测试:如何使用Flink对Strom任务的逻辑功能进行循环测试?

时间:2023-03-21 19:52:16 科技观察

Flink和Strom是目前比较流行的数据流平台。考虑如下应用场景:已经使用strom完成了某个逻辑功能的开发。如果想用Flink实现同样的逻辑,需要考虑如何用Flink对Strom任务的逻辑功能进行最简单的可复现测试。使用Flink测试Strom任务的逻辑有两个基本问题:首先,Storm通过自定义的Bolt类来实现自定义逻辑。在Flink中如何实现呢?其次,Storm按照自定义的标准实现了数据分发的逻辑。在Flink中如何实现呢?本文主要通过两个基本的Flink程序示例来回答以上两个关于使用Flink测试Strom任务逻辑的基本问题。对于第一个问题,我们可以通过Flink的ProcessFuction类来实现。通过继承该类,我们可以在该类的processElement方法中实现自定义逻辑。ProcessFuction类如下图所示。我们可以直接通过var1参数获取当前流中的数据,然后进行自定义逻辑处理,然后通过Collector类var3的collect方法将处理后的数据发送给下一个流。假设一个Strom任务的功能逻辑是:①在初始数据源(字符串)的末尾添加一个字符串。②然后再加入另一串。下面以上述字符串处理的Strom任务为例,说明Flink程序是如何通过ProcessFuction类实现对该任务的递归测试的。(1)Flink主程序,假设初始数据源为“abc”。(2)对于第一个业务处理类,在数据流的末尾添加“def”。(3)对于第二个业务处理类,在数据流的末尾添加“ghi”。(4)执行Flink程序,观察输出结果。“abc”被重新处理为“abcdefghi”。对于数据分布的第二个问题,我们假设某个Strom任务的功能逻辑是对数据源(股票对象)进行分类,将股价高于X的归为一类,将股价低于X的归为一类大于或等于X进入另一个类别。下面以上述Strom任务对存量数据对象进行分类处理为例,说明Flink程序如何利用旁路输出特性,将数据流按照自定义标准进行分类,输出到不同的子数据流中进行处理。Flink的旁路输出仍然涉及ProcessFunction类的processElement方法。该方法的Context类型的v??ar2参数的主要作用是利用其输出方式进行旁路输出(我们用它来进行数据分流)。Flink的bypassoutput特性可以用来拆分数据。通过创建流标签(OutputTag),然后以OutputTag标签对象为参数,调用初始/父数据流的getSideOutput(OutputTag)方法获取子数据流。.每个流标签都有一个id,不需要创建对象。只要流标签的id一样,里面的数据就一样。因此可以通过匿名内部类的形式获取子数据流。第一个参数是id,第二个参数是数据类型(不能省略)。(1)创建一个股票类Stock,其属性包括名称和价格。(2)创建一个消费消息的Flink程序。(3)创建一个生产消息的Flink程序。我们使用“STOCK_LOW_PRICE”和“STOCK_HIGH_PRICE”这两个ID作为两个旁路输出标签的ID。在processElement方法中,我们通过判断股票的价格是否大于50来区分低价股和高价股,并使用Context对象的output方法进行旁路输出,输出其Stock对象ID为“STOCK_LOW_PRICE”的低价股价格低于50。在价格股票标签bypass中,将价格大于等于50的Stock对象输出到ID为“STOCK_HIGH_PRICE”的高价股票标签bypass。(4)依次启动消费者程序和生产者程序,观察消费者程序控制台输出:此时桌面生成两个文件夹,记录股票数据,result1记录低价股票较少大于50,result2中记录股价大于等于50的高价股。