Flink实时计算topN热榜本文转载请联系Java大数据与数据仓库公众号。topN常见应用场景,最热门商品购买量,最热门作者阅读量等。1.Flink使用的知识点创建kafka数据源;基于EventTime处理,如何指定Watermark;Flink中的窗口,有翻滚(tumbling)窗口和滑动(sliding)窗口;使用State状态;ProcessFunction实现TopN函数;2.案例介绍通过用户访问日志,计算出平台近期最活跃用户的topN。创建kafka生产者,向kafka发送测试数据;消费kafka数据,使用滑动(sliding)窗口,每隔一段时间更新一次排名;3、这里的数据源使用kafkaapi向kafka发送测试数据,代码如下:@Data@NoArgsConstructor@AllArgsConstructor@ToStringpublicclassUser{privatelongid;privateStringusername;privateStringpassword;privatelongtimestamp;}Mapconfig=Configuration。initConfig("commons.xml");@TestpublicvoidsendData()throwsInterruptedException{intcnt=0;while(cnt<200){Useruser=newUser();user.setId(cnt);user.setUsername("用户名"+newRandom().nextInt((cnt%5)+2));user.setPassword("密码"+cnt);用户.setTimestamp(System.currentTimeMillis());Futurefuture=KafkaUtil.sendDataToKafka(config.get("kafka-topic"),String.valueOf(cnt),JSON.toJSONString(user));while(!future.isDone()){Thread.sleep(100);}try{RecordMetadatarecordMetadata=future.get();System.out.println(recordMetadata.offset());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionException){e.printStackTrace();}System.out.println("发送消息:"+cnt+"******"+user.toString());cnt=cnt+1;}}这里通过randomnumber对用户名进行打乱,使用户名的大小不同,结果更明显。KafkaUtil是我自己写的一个kafka工具类。代码很简单,主要是为了方便测试。4.主程序创建一个主程序并开始编写代码。创建flink环境,关联kafka数据源。Mapconfig=Configuration.initConfig("commons.xml");PropertieskafkaProps=newProperties();kafkaProps.setProperty("zookeeper.connect",config.get("kafka-zookeeper"));kafkaProps.setProperty("bootstrap.servers",config.get("kafka-ipport"));kafkaProps.setProperty("group.id",config.get("kafka-groupid"));StreamExecutionEnvironmentsenv=StreamExecutionEnvironment.getExecutionEnvironment();EventTime与Watermarksenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);设置属性senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),表示按照数据时间字段来处理,默认是TimeCharacteristic.ProcessingTime/**Thetimecharacteristicthatisusedifnoneotherisset.*/privatestaticfinalTimeCharacteristicDEFAULT_TIME_CHARACTERISTIC=TimeCharacteristic.ProcessingTime;这个属性必须设置,否则可能不会触发窗口结束,无法输出结果。共有三个值:ProcessingTime:处理事件的时间。即由flink集群机器的系统时间决定。EventTime:事件发生的时间。一般是数据本身携带的时间。IngestionTime:Ingestion时间,数据进入flink流的时间,还是和ProcessingTime不同;指定使用数据进行处理的实际时间,然后需要指定flink程序如何获取数据的时间字段,这里使用调用DataStream方法的assignTimestampsAndWatermarks提取时间并设置水印。senv.addSource(newFlinkKafkaConsumer010<>(config.get("kafka-topic"),newSimpleStringSchema(),kafkaProps)).map(x->{returnJSON.parseObject(x,User.class);}).assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1000)){@OverridepubliclongextractTimestamp(Userelement){returnelement.getTimestamp();}})从上面给出的代码可以看出,在发送到kafka时,User对象被转为json了字符串在这里。这里使用了fastjson,可以转成JsonObject进行处理。我还是把它转成User对象JSON.parseObject(x,User.class)方便处理。考虑到数据可能出现乱序,使用了可以处理乱序的抽象类BoundedOutOfOrdernessTimestampExtractor,实现了唯一未实现的方法extractTimestamp。数据乱序会造成数据延迟,在构造方法中传入一个Time。milliseconds(1000),表示数据可以延迟一秒。例如窗口长度为10s,则0~10s的数据会在11s处计算。此时watermark为10,会触发计算。也就是说引入watermark来处理乱序数据最多可以容忍0~t的窗口。数据最迟在时间t+1到达。关于watermark的具体解释可以参考这篇文章https://blog.csdn.net/qq_39657909/article/details/106081543窗口统计业务需求,一般可能是一个小时,或者最近15的数据minutes,每5分钟更新一次排名,这里为了演示,窗口长度为10s,每张slide(幻灯片)为5s,即每5秒更新一次过去10s的排名数据。.keyBy("username").timeWindow(Time.seconds(10),Time.seconds(5)).aggregate(newCountAgg(),newWindowResultFunction())我们使用.keyBy("username")对使用.timeWindow的用户进行分组(Timesize,Timeslide)为每个用户做一个滑动窗口(10s窗口,5s滑动一次)。然后我们使用.aggregate(AggregateFunctionaf,WindowFunctionwf)做增量聚合操作,可以使用AggregateFunction提前聚合数据,减少状态存储压力。与.apply(WindowFunctionwf)相比,window中的所有数据都会被存储起来,最后一起计算效率要高很多。这里使用了aggregate()方法的第一个参数。CountAgg实现AggregateFunction接口。作用是统计窗口中条目的个数,即遇到一条数据就加一。publicclassCountAggimplementsAggregateFunction{@OverridepublicLongcreateAccumulator(){return0L;}@OverridepublicLongadd(Uservalue,Longaccumulator){returnaccumulator+1;}@OverridepublicLonggetResult(Longaccumulator){returnaccumulator;}@OverridepublicLongmerge(Longa,Longb){returna+b;}}.aggregate(AggregateFunctionaf,WindowFunctionwf)第二个参数WindowFunction输出每个键和每个窗口的聚合结果以及其他信息。我们这里实现的WindowResultFunction将用户名、窗口、访问量封装到UserViewCount中输出。privatestaticclassWindowResultFunctionimplementsWindowFunction{@Overridepublicvoidapply(Tuplekey,TimeWindowwindow,Iterableinput,Collectorout)throwsException{Longcount=input.iterator().next();out.collect(newUserViewCount)(((Tuple1)key).f0,window.getEnd(),count));}}@Data@NoArgsConstructor@AllArgsConstructor@ToStringpublicstaticclassUserViewCount{privateStringuserName;privatelongwindowEnd;privatelongviewCount;}TopN计算最活跃的用户,以便统计每个窗口对于活跃用户,我们需要再次按窗口对其进行分组。这里根据UserViewCount中的windowEnd进行keyBy()操作。然后使用ProcessFunction实现一个自定义的TopN函数TopNHotItems,计算出点击次数最多的前3名用户,并将排名结果格式化成字符串,用于后续输出。.keyBy("windowEnd").process(newTopNHotUsers(3)).print();ProcessFunction是Fl??ink提供的底层API,用于实现更高级的功能。主要提供timer定时器的功能(支持EventTime或ProcessingTime)。在这种情况下,我们将使用定时器来判断某个窗口下所有用户的访问数据何时被收集。由于Watermark的进度是全局的,在processElement方法中,每当接收到一条数据(ItemViewCount),我们注册一个windowEnd+1定时器(同时Flink框架会自动忽略重复注册)。当windowEnd+1定时器被触发时,表示已经收到windowEnd+1的Watermark,即windowEnd下的所有用户窗口统计信息都已经收集完毕。在onTimer()中,我们对采集到的所有商品和点击进行排序,选择TopN,将排名信息格式化成字符串输出。这里我们还使用ListState来存储每条接收到的UserViewCount消息,以保证在出现故障时状态数据不丢失且保持一致。ListState是类似于Flink提供的JavaList接口的StateAPI。它集成了框架的checkpoint机制,自动实现了exactly-once的语义保证。privatestaticclassTopNHotUsersextendsKeyedProcessFunction{privateinttopSize;privateListStateuserViewCountListState;publicTopNHotUsers(inttopSize){this.topSize=topSize;}@OverridepublicvoidonTimer(longtimestamp,OnTimerContextctx,Collectorout)throwsException{super.onTimer(timer)ctx,out);ListuserViewCounts=newArrayList<>();for(UserViewCountuserViewCount:userViewCountListState.get()){userViewCounts.add(userViewCount);}userViewCountListState.clear();userViewCounts.sort(newComparator(){@Overridepublicintcompare(UserViewCounto1,UserViewCounto2){return(int)(o2.viewCount-o1.viewCount);}});//将排列名称信息格式化成String,便于打印StringBuilderresult=newStringBuilder();result.append("====================================\n");result.append("时间:").append(newTimestamp(timestamp-1)).append("\n");for(inti=0;iuserViewCountListStateDescriptor=newListStateDescriptor<>("user-state",UserViewCount.class);userViewCountListState=getRuntimeContext().getListState(userViewCountListStateDescriptor);}@OverridepublicvoidprocessElement(UserViewCountvalue,Contextctx,Collectorout)throwsException{userViewCountListState.add(value);ctx.timerService().registerEventTimeTimer(value.windowEnd+1000);}}可以看到结果输出,每5秒更新一次输出数据参考http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/