多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] # 数据样本 ~~~ i am jdxia i am xjd i am jdxia i am jelly ~~~ # jar包 ~~~ <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <!--<scope>provided</scope>--> <version>0.9.5</version> </dependency> ~~~ 安装log4j # 数据获取 ~~~ package com.learnstorm; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import org.apache.commons.lang.StringUtils; import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.Map; //数据获取 public class MyLocalFileSpout extends BaseRichSpout { private SpoutOutputCollector collector; private BufferedReader bufferedReader; //初始化方法 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; try { //定义这个去读取数据 this.bufferedReader = new BufferedReader(new FileReader(new File("/Users/jdxia/Desktop/MyFile/i.txt"))); } catch (FileNotFoundException e) { e.printStackTrace(); } } //storm流式计算的特征就是数据一条一条的处理 // while(true) { // this.nextTuple(); // } //这个方法会被循环调用 @Override public void nextTuple() { //每被调用一次就会发送一条数据出去 try { //读取一行 String line = bufferedReader.readLine(); //如果不是空的话 if (StringUtils.isNotBlank(line)) { List<Object> arrayList = new ArrayList<Object>(); //把数据放到ArrayList中 arrayList.add(line); //把数据发出去 collector.emit(arrayList); } } catch (IOException e) { e.printStackTrace(); } } //定义下我的输出 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("juzi")); } } ~~~ # 数据截取 ~~~ package com.learnstorm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; //相当于map-->world,1 //业务逻辑 //对句子进行切割 public class MySplitBolt extends BaseBasicBolt { //处理函数 @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //1.数据如何获取,用tuple获取 //强转为string,juzi是上一步定义的 String juzi = (String) tuple.getValueByField("juzi"); //2.进行切割 String[] strings = juzi.split(" "); //3.发送数据 for (String word : strings) { //我们之前用ArrayList存储,这边怎么变为Values //可以看下Values的源码,他是继承了ArrayList,他存的时候用了一个循环 basicOutputCollector.emit(new Values(word, 1)); } } //定义下我的输出 //单词world和他的次数 @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "num")); } } ~~~ # 单词统计 ~~~ package com.learnstorm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; //打印 public class MyWordCountAndPrintBolt extends BaseBasicBolt { private Map<String, Integer> wordCountMap = new HashMap<String, Integer>(); //处理函数 @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //根据之前定义的word和num //强转为string String word = (String) tuple.getValueByField("word"); Integer num = (Integer) tuple.getValueByField("num"); //1.查看单词对应的value是否存在 Integer integer = wordCountMap.get(word); if (integer == null || integer.intValue() == 0) { //如果不存在就直接放入新的 wordCountMap.put(word, num); } else { //如果之前已经有了,就把对应统计加上 wordCountMap.put(word, integer.intValue() + num); } System.out.println(wordCountMap); } //不需要定义输出字段了 @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } ~~~ # 任务描述 这边写的是本地提交到集群 ~~~ package com.learnstorm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; public class StormTopologyDriver { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { //1. 描述任务 TopologyBuilder topologyBuilder = new TopologyBuilder(); //任务的名字自己定义 topologyBuilder.setSpout("mySpout", new MyLocalFileSpout()); //shuffleGrouping和前一个任务关联 topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("mySpout"); topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1"); //2. 任务提交 //提交给谁?提交什么内容? Config config = new Config(); StormTopology stormTopology = topologyBuilder.createTopology(); //本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordCount", config, stormTopology); //集群模式 // StormSubmitter.submitTopology("worldCount1", config, stormTopology); } } ~~~