一:介绍Storm设计模型
1.Topology
Storm对任务的抽象,其实 就是将实时数据分析任务 分解为 不同的阶段
点: 计算组件 Spout Bolt
边: 数据流向 数据从上一个组件流向下一个组件 带方向
2.tuple
Storm每条记录 封装成一个tuple
其实就是一些keyvalue对按顺序排列
方便组件获取数据
3.Spout
数据采集器
源源不断的日志记录 如何被topology接收进行处理?
Spout负责从数据源上获取数据,简单处理 封装成tuple向后面的bolt发射
4.Bolt
数据处理器
二:开发wordcount案例
1.书写整个大纲的点线图
2..程序结构
3.修改pom文件
这个地方需要注意,在集群上的时候,这时候storm的包是有的,不要再打包,所以将provided打开。
1 xml version="1.0" encoding="UTF-8"?>
2 <project xmlns&#61;"http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0modelVersion>
6
7 <groupId>com.cj.itgroupId>
8 <artifactId>stormartifactId>
9 <version>1.0-SNAPSHOTversion>
10
11 <properties>
12 <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
13 <hbase.version>0.98.6-cdh5.3.6hbase.version>
14 <hdfs.version>2.5.0-cdh5.3.6hdfs.version>
15 <storm.version>0.9.6storm.version>
16 properties>
17
18 <repositories>
19 <repository>
20 <id>clouderaid>
21 <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
22 repository>
23 <repository>
24 <id>alimavenid>
25 <name>aliyun mavenname>
26 <url>http://maven.aliyun.com/nexus/content/groups/public/url>
27 repository>
28 repositories>
29
30 <dependencies>
31 <dependency>
32 <groupId>junitgroupId>
33 <artifactId>junitartifactId>
34 <version>4.12version>
35 <scope>testscope>
36 dependency>
37 <dependency>
38 <groupId>org.apache.stormgroupId>
39 <artifactId>storm-coreartifactId>
40 <version>${storm.version}version>
41
42
43
44 dependency>
45 <dependency>
46 <groupId>org.apache.stormgroupId>
47 <artifactId>storm-hbaseartifactId>
48 <version>${storm.version}version>
49 <exclusions>
50 <exclusion>
51 <groupId>org.apache.hadoopgroupId>
52 <artifactId>hadoop-hdfsartifactId>
53 exclusion>
54 <exclusion>
55 <groupId>org.apache.hbasegroupId>
56 <artifactId>hbase-clientartifactId>
57 exclusion>
58 exclusions>
59 dependency>
60
61 <dependency>
62 <groupId>org.apache.hadoopgroupId>
63 <artifactId>hadoop-hdfsartifactId>
64 <version>${hdfs.version}version>
65 dependency>
66 <dependency>
67 <groupId>org.apache.hbasegroupId>
68 <artifactId>hbase-clientartifactId>
69 <version>${hbase.version}version>
70 <exclusions>
71 <exclusion>
72 <artifactId>slf4j-log4j12artifactId>
73 <groupId>org.slf4jgroupId>
74 exclusion>
75 exclusions>
76 dependency>
77 <dependency>
78 <groupId>org.apache.zookeepergroupId>
79 <artifactId>zookeeperartifactId>
80 <version>3.4.6version>
81 <exclusions>
82 <exclusion>
83 <artifactId>slf4j-log4j12artifactId>
84 <groupId>org.slf4jgroupId>
85 exclusion>
86 exclusions>
87 dependency>
88 <dependency>
89 <groupId>org.apache.stormgroupId>
90 <artifactId>storm-kafkaartifactId>
91 <version>${storm.version}version>
92 <exclusions>
93 <exclusion>
94 <groupId>org.apache.zookeepergroupId>
95 <artifactId>zookeeperartifactId>
96 exclusion>
97 exclusions>
98 dependency>
99 <dependency>
100 <groupId>org.apache.kafkagroupId>
101 <artifactId>kafka_2.10artifactId>
102 <version>0.8.1.1version>
103 <exclusions>
104 <exclusion>
105 <groupId>org.apache.zookeepergroupId>
106 <artifactId>zookeeperartifactId>
107 exclusion>
108 <exclusion>
109 <groupId>log4jgroupId>
110 <artifactId>log4jartifactId>
111 exclusion>
112 exclusions>
113 dependency>
114 <dependency>
115 <groupId>org.mockitogroupId>
116 <artifactId>mockito-allartifactId>
117 <version>1.9.5version>
118 <scope>testscope>
119 dependency>
120 <dependency>
121 <groupId>cz.mallat.uasparsergroupId>
122 <artifactId>uasparserartifactId>
123 <version>0.6.1version>
124 dependency>
125 dependencies>
126
127 <build>
128 <plugins>
129 <plugin>
130 <artifactId>maven-compiler-pluginartifactId>
131 <version>3.3version>
132 <configuration>
133 <source>1.7source>
134 <target>1.7target>
135 configuration>
136 plugin>
137 <plugin>
138 <artifactId>maven-assembly-pluginartifactId>
139 <version>2.4version>
140 <configuration>
141 <descriptors>
142 <descriptor>src/main/assembly/src.xmldescriptor>
143 descriptors>
144 <descriptorRefs>
145 <descriptorRef>jar-with-dependenciesdescriptorRef>
146 descriptorRefs>
147 configuration>
148 <executions>
149 <execution>
150 <id>make-assemblyid>
151 <phase>packagephase>
152 <goals>
153 <goal>singlegoal>
154 goals>
155 execution>
156 executions>
157 plugin>
158 plugins>
159 build>
160
161 project>
4.src.xml
1 xml version&#61;"1.0" encoding&#61;"UTF-8"?>
2 <assembly
3 xmlns&#61;"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
4 xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
5 xsi:schemaLocation&#61;"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
6 <id>jar-with-dependenciesid>
7 <formats>
8 <format>jarformat>
9 formats>
10 <includeBaseDirectory>falseincludeBaseDirectory>
11 <dependencySets>
12 <dependencySet>
13 <unpack>falseunpack>
14 <scope>runtimescope>
15 dependencySet>
16 dependencySets>
17 <fileSets>
18 <fileSet>
19 <directory>/libdirectory>
20 fileSet>
21 fileSets>
22 assembly>
5.log
1 log4j.rootLogger&#61;info,console
2
3 log4j.appender.console&#61;org.apache.log4j.ConsoleAppender
4 log4j.appender.console.layout&#61;org.apache.log4j.SimpleLayout
5
6 log4j.logger.com.ibeifeng&#61;INFO
6.SentenceSpout.java
1 package com.jun.it;
2
3 import backtype.storm.spout.SpoutOutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichSpout;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.topology.base.BaseRichSpout;
8 import backtype.storm.tuple.Fields;
9 import backtype.storm.tuple.Values;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import java.util.Map;
14 import java.util.Random;
15
16
17 public class SentenceSpout extends BaseRichSpout {
18 private static final Logger logger&#61; LoggerFactory.getLogger(SentenceSpout.class);
19 private SpoutOutputCollector collector;
20 //制造数据
21 private static final String[] SENTENCES&#61;{
22 "hadoop oozie storm hive",
23 "hadoop spark sqoop hbase",
24 "error flume yarn mapreduce"
25 };
26 //初始化collector
27 &#64;Override
28 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
29 this.collector&#61;spoutOutputCollector;
30 }
31 //Key的设置
32 &#64;Override
33 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
34 outputFieldsDeclarer.declare(new Fields("sentence"));
35 }
36 //Tuple的组装
37 &#64;Override
38 public void nextTuple() {
39 String sentence&#61;SENTENCES[new Random().nextInt(SENTENCES.length)];
40 if(sentence.contains("error")){
41 logger.error("记录有问题"&#43;sentence);
42 }else{
43 this.collector.emit(new Values(sentence));
44 }
45 try{
46 Thread.sleep(1000);
47 }catch (Exception e){
48 e.printStackTrace();
49 }
50 }
51
52 public SentenceSpout() {
53 super();
54 }
55
56 &#64;Override
57 public void close() {
58
59 }
60
61 &#64;Override
62 public void activate() {
63 super.activate();
64 }
65
66 &#64;Override
67 public void deactivate() {
68 super.deactivate();
69 }
70
71 &#64;Override
72 public void ack(Object msgId) {
73 super.ack(msgId);
74 }
75
76 &#64;Override
77 public void fail(Object msgId) {
78 super.fail(msgId);
79 }
80
81 }
7.SplitBolt.java
1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.Map;
12
13 public class SplitBolt implements IRichBolt {
14 private OutputCollector collector;
15 &#64;Override
16 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
17 this.collector&#61;outputCollector;
18 }
19
20 &#64;Override
21 public void execute(Tuple tuple) {
22 String sentence&#61;tuple.getStringByField("sentence");
23 if(sentence!&#61;null&&!"".equals(sentence)){
24 String[] words&#61;sentence.split(" ");
25 for (String word:words){
26 this.collector.emit(new Values(word));
27 }
28 }
29 }
30
31 &#64;Override
32 public void cleanup() {
33
34 }
35
36 &#64;Override
37 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
38 outputFieldsDeclarer.declare(new Fields("word"));
39 }
40
41 &#64;Override
42 public Map
43 return null;
44 }
45 }
8.CountBolt.java
1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.HashMap;
12 import java.util.Map;
13
14 public class CountBolt implements IRichBolt {
15 private Map
16 private OutputCollector collector;
17 &#64;Override
18 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
19 this.collector&#61;outputCollector;
20 counts&#61;new HashMap<>();
21 }
22
23 &#64;Override
24 public void execute(Tuple tuple) {
25 String word&#61;tuple.getStringByField("word");
26 int count&#61;1;
27 if(counts.containsKey(word)){
28 count&#61;counts.get(word)&#43;1;
29 }
30 counts.put(word,count);
31 this.collector.emit(new Values(word,count));
32 }
33
34 &#64;Override
35 public void cleanup() {
36
37 }
38
39 &#64;Override
40 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
41 outputFieldsDeclarer.declare(new Fields("word","count"));
42 }
43
44 &#64;Override
45 public Map
46 return null;
47 }
48 }
9.printBolt.java
1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Tuple;
8
9 import java.util.Map;
10
11 public class PrintBolt implements IRichBolt {
12 &#64;Override
13 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
14
15 }
16
17 &#64;Override
18 public void execute(Tuple tuple) {
19 String word&#61;tuple.getStringByField("word");
20 int count&#61;tuple.getIntegerByField("count");
21 System.out.println("word:"&#43;word&#43;", count:"&#43;count);
22 }
23
24 &#64;Override
25 public void cleanup() {
26
27 }
28
29 &#64;Override
30 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
31
32 }
33
34 &#64;Override
35 public Map
36 return null;
37 }
38 }
10.WordCountTopology.java
1 package com.jun.it;
2
3 import backtype.storm.Config;
4 import backtype.storm.LocalCluster;
5 import backtype.storm.StormSubmitter;
6 import backtype.storm.generated.AlreadyAliveException;
7 import backtype.storm.generated.InvalidTopologyException;
8 import backtype.storm.topology.TopologyBuilder;
9 import backtype.storm.tuple.Fields;
10
11 public class WordCountTopology {
12 private static final String SENTENCE_SPOUT&#61;"sentenceSpout";
13 private static final String SPLIT_BOLT&#61;"splitBolt";
14 private static final String COUNT_BOLT&#61;"countBolt";
15 private static final String PRINT_BOLT&#61;"printBolt";
16 public static void main(String[] args){
17 TopologyBuilder topologyBuilder&#61;new TopologyBuilder();
18 topologyBuilder.setSpout(SENTENCE_SPOUT,new SentenceSpout());
19 topologyBuilder.setBolt(SPLIT_BOLT,new SplitBolt()).shuffleGrouping(SENTENCE_SPOUT);
20 topologyBuilder.setBolt(COUNT_BOLT,new CountBolt()).fieldsGrouping(SPLIT_BOLT,new Fields("word"));
21 topologyBuilder.setBolt(PRINT_BOLT,new PrintBolt()).globalGrouping(COUNT_BOLT);
22 Config config&#61;new Config();
23 if(args&#61;&#61;null||args.length&#61;&#61;0){
24 LocalCluster localCluster&#61;new LocalCluster();
25 localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
26 }else{
27 config.setNumWorkers(1);
28 try {
29 StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
30 } catch (AlreadyAliveException e) {
31 e.printStackTrace();
32 } catch (InvalidTopologyException e) {
33 e.printStackTrace();
34 }
35 }
36
37 }
38 }
三&#xff1a;本地运行
1.前提
原本以为需要启动storm&#xff0c;后来发现&#xff0c;不需要启动Storm。
只需要在main的时候Run即可
2.结果
四&#xff1a;集群运行
1.在IDEA下打包
下面的是有依赖的包。
2.上传到datas下
3.运行
bin/storm jar /opt/datas/storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.jun.it.WordCountTopology wordcount
4.UI效果