当前位置: 首页 > 科技观察

TwitterStorm进阶初步设置

时间:2023-03-12 21:53:00 科技观察

这篇博客是一个简单的Storm入门示例,目的是让读者了解Storm是如何工作的。以及后面会发布的Storm的几个高级特性,最终将Storm集成到Hadoop2.x的YARN中。目标读者为对大数据有进阶的Hadoop、Spark用户,或了解Storm并想深入了解Storm的读者。ProjectPom(Stormjar没有提交到Maven中央仓库,需要在项目中添加以下仓库地址):centralMavenRepositorySwitchboarddefaulthttp://maven.oschina.net/content/groups/public/false<存储库clojarshttps://clojars.org/repo/falsetrueorg.yamlsnakeyaml1.13org.apache.zookeeperzookeeper3.3.3;org.clojureclojure1.5.1stormstorm0.9.0.1stormlibthrift70.7.0下面是Storm的HelloWord的例子,代码已经删掉,熟悉Storm的读者自然可以把代码整理成一个完整的例子publicstaticvoidmain(String[]args){Configconf=newConfig();conf.put(Config.STORM_LOCAL_DIR,"/Volumes/Study/data/storm");conf.put(Config.STORM_CLUSTER_MODE,"local");//conf.put("storm.local.mode.zmq","false");conf.put("storm.zookeeper.root","/storm");conf.put("storm.zookeeper.session.timeout",50000);conf.put("storm.zookeeper.servers","nowledgedata-n15");conf.put("storm.zookeeper.port",2181);//conf.setDebug(true);//conf.setNumWorkers(2);TopologyBuilderbuilder=newTopologyBuilder();builder.setSpout("words",newTestWordSpout(),2);builder.setBolt("exclaim2",newDefaultStringBolt(),5).shuffleGrouping("words");LocalClustercluster=newLocalCluster();cluster.submitTopology("test",conf,builder.createTopology());}Config.STORM_LOCAL_DIR是配置一个本地路径,Storm会在其中写入一些配置信息和临时数据。Config.STORM_CLUSTER_MODE是运行模式,本地和分布式两种选择,即本地模式和分布式模式。本地模式是运行时模拟多线程,用于开发和测试;分布式模式是分布式集群下的多进程,是真正的分布式。Storm的spout和blot高可用是通过ZooKeeper来协调的,storm.zookeeper.root是一个ZooKeeper地址,并且有对应的端口号。Debug是一种测试模式,有更详细的日志信息。TestWordSpout是Storm自带的例子,用于随机生成newString[]{“nathan”,“mike”,“jackson”,“golda”,“bertels”};列表中的字符串用于提供数据源。其中DefaultStringBolt的源码:OutputCollectorcollector;publicvoidprepare(Mapconf,TopologyContextcontext,OutputCollectorcollector){this.collector=collector;}publicvoidexecute(Tupletuple){log.info("revamessage:"+tuple.getString(0));collector.emit(tuple,newValues(tuple.getString(0)+"!!!"));collector.ack(tuple);}运行日志:10658[Thread-29-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson10658[Thread-31-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson10758[Thread-26-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:mike10758[Thread-33-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:nathan10859[Thread-26-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:nathan10859[Thread-29-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:bertels10961[Thread-31-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson10961[Thread-33-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson11061[Thread-35-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:nathan11062[Thread-35-exclaim2]INFOcn。pointways.dstorm.bolt.DefaultStringBolt-revamessage:nathan11162[Thread-26-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:bertels11163[Thread-26-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson数据由一个叫做喷嘴(Spout,也挺水龙头,可以生成数据源)的Storm产生,然后传给后端一系列的Blots,最后转化消费。Spouts和Blots都是并行的,并行度可以自己设置(本地运行通过多线程模拟)。例如:builder.setSpout("words",newTestWordSpout(),2);builder.setBolt("exclaim2",newDefaultStringBolt(),5)喷嘴TestWordSpout的并行度为2,DefaultStringBolt的并行度为5。从logOut可以看出,数据通过到达一个预先安排好的Blot喷嘴,并打印日志。我的测试代码设置的并行度是5,日志中统计确实有5个线程:Thread-29-exclaim2Thread-31-exclaim2Thread-26-exclaim2Thread-33-exclaim2Thread-35-exclaim2Storm是什么?这是详细信息。借用OSC网友的话,Hadoop就是商场里的自动电梯。用户需要排队等候,选择楼层,然后到达;而Storm就像一个自动扶梯。目的地很明确。Storm在我的理解中,Storm和Hadoop是完全不同的,在设计上没有半点拟合的部分。Storm更像是我之前介绍的SpringIntegration,它是一个数据流系统。它可以按照预先设定的流程对数据进行转换、传输、分解、合并,最终数据到达后端存储。只不过Storm是可以分布式的,分布式的能力也可以自己设置。Storm的这个特性非常适合大数据ETL系统的开发。