什么是实时计算?请看下图:我们以热销产品的统计为例,看看传统的计算方式:将用户行为、日志等信息清洗干净,存入数据库。将订单信息保存在数据库中。使用触发器或协程通过其他方式创建本地索引,或远程独立索引。Join订单信息、订单明细、用户信息、商品信息等表格,聚合统计20分钟内热销商品,返回top-10。Web或应用程序显示。这是一个假设的场景,但是假设你有处理过类似场景的经验,你应该会遇到以下问题和困难:,数据量很大很大。由于事务信息涉及事务,直接放弃关系型数据库的事务能力,迁移到具有更好横向扩展能力的NoSQL数据库,是比较困难的。然后,分片一般就完成了。好在历史数据可以按日期归档,通过批量处理离线计算可以缓存结果。但是,这里的要求是20分钟以内,很难。性能问题这个问题和scale-out是一致的。假设我们做分片,因为表是分散在各个节点的,所以需要多次存储,在业务层做聚合计算。问题来了,在20分钟的时间要求中,我们需要存储多少次呢?10分钟?5分钟?实时呢?而且业务层也面临着单点计算能力的限制,需要横向扩展,所以需要考虑一致性的问题。所以,这里的一切似乎都很复杂。业务扩展问题假设我们不仅要处理热销产品的统计,还要统计广告点击量,或者根据用户访问行为快速判断用户特征,调整他们看到的信息,更好地满足用户的潜在需求等.,那么业务层会比较复杂。或许你有更好的办法,但其实我们需要的是一种新的认知:这个世界上发生的事情是实时的。所以我们需要实时计算模型,而不是批处理模型。我们需要的模型必须能够处理大量数据,因此它必须具有良好的横向扩展能力。最好的是,我们不需要过多考虑一致性和复制性。那么,这种计算模型就是实时计算模型,也可以认为是流式计算模型。现在假设我们有这样一个模型,我们可以愉快地设计新的业务场景:转发最多的微博是什么?什么是最畅销的商品?大家搜索的热点是什么?我们的哪个广告以及在哪个位置获得的点击次数最多?换句话说,我们可以问:这个世界上发生了什么?最火的微博话题是什么?我们用一个简单的滑动窗口计数问题来揭开所谓实时计算的神秘面纱。假设,我们的业务需求是:统计20分钟内微博热度最高的10个话题。为了解决这个问题,我们需要考虑:数据来源这里假设我们的数据来自于微博长链接推送的话题。问题建模我们认为topic是#展开的topic,最热门的topic就是这个topic出现的次数比其他topic多。例如:@foreach_break:你好,#世界#,我爱你,#微博#。“世界”和“微博”是话题。计算引擎我们使用storm。定义时间如何定义时间?时间的定义是一件困难的事情,这取决于需要多少精度。根据现实,我们一般用tick来表示时间的概念。在Storm的基础架构中,在执行程序启动阶段,使用计时器“在一段时间后”触发事件。如下:receive-queueexecutor-data)context(:worker-contextexecutor-data)](whentick-time-secs(if(or(system-id?(:component-ideexecutor-data))(and(=false(storm-confTOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))(=:spout(:typeexecutor-data))))(log-message"Timeoutsdisabledforexecutor"(:component-ideexecutor-data)":"(:executor-ideexecutor-data))(schedule-循环(:用户计时器工作人员)滴答时间秒秒时间秒(fn[](中断器/publishreceive队列[[nil(TupleImpl.context[滴答时间秒]常量/SYSTEM_TASK_IDConstants/SYSTEM_TICK_STREAM_ID)]])))))))在之前的博文中,已经详细分析了这些基础设施之间的关系。不懂的童鞋可以看看之前的文章。每隔一段时间,就会触发这样的事件,当流下游的螺栓接收到这样的事件时,它可以选择是增加计数还是聚合结果并将它们发送到流。bolt如何判断接收到的元组代表“tick”呢?负责管理bolt的executor线程在消费订阅消息队列中的消息时会调用bolt的execute方法。那么在execute中可以判断如下:设置勾选!上面我们可以知道,SYSTEM_TICK_STREAM_ID是在计时事件的回调中作为构造函数的参数传递给tuple的。那么SYSTEM_COMPONENT_ID是怎么来的呢?可以看出,在下面的代码中,SYSTEM_TASK_ID也传递给了元组:;;请注意SYSTEM_TASK_ID和SYSTEM_TICK_STREAM_ID(TupleImpl.context[tick-time-secs]Constants/SYSTEM_TASK_IDConstants/SYSTEM_TICK_STREAM_ID)然后使用以下代码获取SYSTEM_COMPONENT_ID:;}else{return_taskToComponent.get(taskId);}}#p#Slidingwindow有了上面的基础设施,我们还需要一些手段来完成“工程化”,把想法变成现实。在这里,我们看看MichaelG.Noll的滑动窗口设计。注:图片来自http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/TopologyStringspoutId="wordGenerator";StringcounterId="counter";StringintermediateRankerId="intermediateRanker";StringtotalRankerId="finalRanker";//这里假设TestWordSpout为源builder.setSpout(spoutId,newTestWordSpout(),5);//RollingCountBolt的时间窗口为9秒,每发送一次统计每3秒向下游发送一次结果builder.setBolt(counterId,newRollingCountBolt(9,3),4).fieldsGrouping(spoutId,newFields("word"));//IntermediateRankingsBolt将完成部分聚合并统计top-ntopicbuilder.setBolt(intermediateRankerId,newIntermediateRankingsBolt(TOP_N),4).fieldsGrouping(counterId,newFields("obj"));//TotalRankingsBolt会完成完整的聚合并统计top-n的topicbuilder.setBolt(totalRankerId,newTotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);上面的拓扑设计如下:注:图片来自http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/Combining聚合与时间的计算在上一篇文章中,我们描述了tick事件,bolt的execute方法将在回调中被触发,所以你可以这样做:/tick是的,在时间窗口发送统计结果,让窗口滚动count++//注意这里的速度基本取决于流的速度,可能是每秒几百万,也可能是每秒几十。//内存不够?bolt可以scale-out.privatevoidcountObjAndAck(Tupletuple){Objectobj=tuple.getValue(0);counter.incrementCount(obj);collector.ack(tuple);}//将统计结果发送给下游privatevoidemitCurrentWindowCounts(){Map<对象,long>counts=counter.getCountSthenAdvanceWindow();intactualWindowLengthInSeconds=lastModifiedTracker.secondsssinceSOLDESTMODESTMODIFICAION();lastModifiedTracker.markasmasmasmasmasmasmasmodified();}emit(counts,actualWindowLengthInSeconds);}上面的代码可能有点抽象,看这张图就明白了,打勾当它到达时,窗口会滚动:注:图片来自http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/IntermediateRankingsBolt&TotalRankingsBolt:publicfinalvoidexecute(Tupletuple,BasicOutputCollectorcollector){if(TupleUtils.isTick(tuple)){getLogger().debug("Receivedticktuple,triggeringemitofcurrentrankings");//将聚合排序后的结果发送给下游emitRankings(collector);}else{//Aggregation和sortupdateRankingsWithTuple(tuple);}}其中,IntermediateRankingsBolt和TotalRankingsBolt的聚合排序方式略有不同:IntermediateRankingsBolt的聚合排序方式:){//这一步提取主题,提取主题出现的次数Rankablerankable=RankableObjectWithFields.from(tuple);//这一步提取主题出现的次数是一个聚合,然后对所有topic进行重新排序super.getRankings().updateWith(rankable);}TotalRankingsBolt的聚合排序方法://ToBeMerged=(Rankings)tuple.getValue(0);//聚合排序super.getRankings().updateWith(rankingsToBeMerged);//归0,节省内存){Collections.sort(rankedItems);Collections.reverse(rankedItems);}结论下图可能就是我们想要的结果。我们已经完成了t0-t1之间的热点话题统计,foreach_break只是为了防盗版:]。详细解释了滑动窗口计数的概念和关键代码。如果还是不明白,请参考http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/的设计和风暴的源代码。我希望你明白什么是实时计算:]
