热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Storm中关于Topology的设计

一:介绍Storm设计模型1.TopologyStorm对任务的抽象,其实就是将实时数据分析任务分解为不同的阶段点:计算组件Spo

一:介绍Storm设计模型

1.Topology

  Storm对任务的抽象,其实 就是将实时数据分析任务 分解为 不同的阶段    

  点: 计算组件   Spout   Bolt

  边: 数据流向    数据从上一个组件流向下一个组件  带方向

 

2.tuple

  Storm每条记录 封装成一个tuple

  其实就是一些keyvalue对按顺序排列

  方便组件获取数据

 

3.Spout

  数据采集器

  源源不断的日志记录  如何被topology接收进行处理?

  Spout负责从数据源上获取数据,简单处理 封装成tuple向后面的bolt发射

 

4.Bolt

  数据处理器

  

二:开发wordcount案例

1.书写整个大纲的点线图

  

 

2..程序结构

  

 

3.修改pom文件

  这个地方需要注意,在集群上的时候,这时候storm的包是有的,不要再打包,所以将provided打开。

  

1 xml version="1.0" encoding="UTF-8"?>
2 <project xmlns&#61;"http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0modelVersion>
6
7 <groupId>com.cj.itgroupId>
8 <artifactId>stormartifactId>
9 <version>1.0-SNAPSHOTversion>
10
11 <properties>
12 <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
13 <hbase.version>0.98.6-cdh5.3.6hbase.version>
14 <hdfs.version>2.5.0-cdh5.3.6hdfs.version>
15 <storm.version>0.9.6storm.version>
16 properties>
17
18 <repositories>
19 <repository>
20 <id>clouderaid>
21 <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
22 repository>
23 <repository>
24 <id>alimavenid>
25 <name>aliyun mavenname>
26 <url>http://maven.aliyun.com/nexus/content/groups/public/url>
27 repository>
28 repositories>
29
30 <dependencies>
31 <dependency>
32 <groupId>junitgroupId>
33 <artifactId>junitartifactId>
34 <version>4.12version>
35 <scope>testscope>
36 dependency>
37 <dependency>
38 <groupId>org.apache.stormgroupId>
39 <artifactId>storm-coreartifactId>
40 <version>${storm.version}version>
41
42
43
44 dependency>
45 <dependency>
46 <groupId>org.apache.stormgroupId>
47 <artifactId>storm-hbaseartifactId>
48 <version>${storm.version}version>
49 <exclusions>
50 <exclusion>
51 <groupId>org.apache.hadoopgroupId>
52 <artifactId>hadoop-hdfsartifactId>
53 exclusion>
54 <exclusion>
55 <groupId>org.apache.hbasegroupId>
56 <artifactId>hbase-clientartifactId>
57 exclusion>
58 exclusions>
59 dependency>
60
61 <dependency>
62 <groupId>org.apache.hadoopgroupId>
63 <artifactId>hadoop-hdfsartifactId>
64 <version>${hdfs.version}version>
65 dependency>
66 <dependency>
67 <groupId>org.apache.hbasegroupId>
68 <artifactId>hbase-clientartifactId>
69 <version>${hbase.version}version>
70 <exclusions>
71 <exclusion>
72 <artifactId>slf4j-log4j12artifactId>
73 <groupId>org.slf4jgroupId>
74 exclusion>
75 exclusions>
76 dependency>
77 <dependency>
78 <groupId>org.apache.zookeepergroupId>
79 <artifactId>zookeeperartifactId>
80 <version>3.4.6version>
81 <exclusions>
82 <exclusion>
83 <artifactId>slf4j-log4j12artifactId>
84 <groupId>org.slf4jgroupId>
85 exclusion>
86 exclusions>
87 dependency>
88 <dependency>
89 <groupId>org.apache.stormgroupId>
90 <artifactId>storm-kafkaartifactId>
91 <version>${storm.version}version>
92 <exclusions>
93 <exclusion>
94 <groupId>org.apache.zookeepergroupId>
95 <artifactId>zookeeperartifactId>
96 exclusion>
97 exclusions>
98 dependency>
99 <dependency>
100 <groupId>org.apache.kafkagroupId>
101 <artifactId>kafka_2.10artifactId>
102 <version>0.8.1.1version>
103 <exclusions>
104 <exclusion>
105 <groupId>org.apache.zookeepergroupId>
106 <artifactId>zookeeperartifactId>
107 exclusion>
108 <exclusion>
109 <groupId>log4jgroupId>
110 <artifactId>log4jartifactId>
111 exclusion>
112 exclusions>
113 dependency>
114 <dependency>
115 <groupId>org.mockitogroupId>
116 <artifactId>mockito-allartifactId>
117 <version>1.9.5version>
118 <scope>testscope>
119 dependency>
120 <dependency>
121 <groupId>cz.mallat.uasparsergroupId>
122 <artifactId>uasparserartifactId>
123 <version>0.6.1version>
124 dependency>
125 dependencies>
126
127 <build>
128 <plugins>
129 <plugin>
130 <artifactId>maven-compiler-pluginartifactId>
131 <version>3.3version>
132 <configuration>
133 <source>1.7source>
134 <target>1.7target>
135 configuration>
136 plugin>
137 <plugin>
138 <artifactId>maven-assembly-pluginartifactId>
139 <version>2.4version>
140 <configuration>
141 <descriptors>
142 <descriptor>src/main/assembly/src.xmldescriptor>
143 descriptors>
144 <descriptorRefs>
145 <descriptorRef>jar-with-dependenciesdescriptorRef>
146 descriptorRefs>
147 configuration>
148 <executions>
149 <execution>
150 <id>make-assemblyid>
151 <phase>packagephase>
152 <goals>
153 <goal>singlegoal>
154 goals>
155 execution>
156 executions>
157 plugin>
158 plugins>
159 build>
160
161 project>

 

4.src.xml

1 xml version&#61;"1.0" encoding&#61;"UTF-8"?>
2 <assembly
3 xmlns&#61;"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
4 xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
5 xsi:schemaLocation&#61;"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
6 <id>jar-with-dependenciesid>
7 <formats>
8 <format>jarformat>
9 formats>
10 <includeBaseDirectory>falseincludeBaseDirectory>
11 <dependencySets>
12 <dependencySet>
13 <unpack>falseunpack>
14 <scope>runtimescope>
15 dependencySet>
16 dependencySets>
17 <fileSets>
18 <fileSet>
19 <directory>/libdirectory>
20 fileSet>
21 fileSets>
22 assembly>

 

5.log

1 log4j.rootLogger&#61;info,console
2
3 log4j.appender.console&#61;org.apache.log4j.ConsoleAppender
4 log4j.appender.console.layout&#61;org.apache.log4j.SimpleLayout
5
6 log4j.logger.com.ibeifeng&#61;INFO

 

6.SentenceSpout.java

1 package com.jun.it;
2
3 import backtype.storm.spout.SpoutOutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichSpout;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.topology.base.BaseRichSpout;
8 import backtype.storm.tuple.Fields;
9 import backtype.storm.tuple.Values;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import java.util.Map;
14 import java.util.Random;
15
16
17 public class SentenceSpout extends BaseRichSpout {
18 private static final Logger logger&#61; LoggerFactory.getLogger(SentenceSpout.class);
19 private SpoutOutputCollector collector;
20 //制造数据
21 private static final String[] SENTENCES&#61;{
22 "hadoop oozie storm hive",
23 "hadoop spark sqoop hbase",
24 "error flume yarn mapreduce"
25 };
26 //初始化collector
27 &#64;Override
28 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
29 this.collector&#61;spoutOutputCollector;
30 }
31 //Key的设置
32 &#64;Override
33 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
34 outputFieldsDeclarer.declare(new Fields("sentence"));
35 }
36 //Tuple的组装
37 &#64;Override
38 public void nextTuple() {
39 String sentence&#61;SENTENCES[new Random().nextInt(SENTENCES.length)];
40 if(sentence.contains("error")){
41 logger.error("记录有问题"&#43;sentence);
42 }else{
43 this.collector.emit(new Values(sentence));
44 }
45 try{
46 Thread.sleep(1000);
47 }catch (Exception e){
48 e.printStackTrace();
49 }
50 }
51
52 public SentenceSpout() {
53 super();
54 }
55
56 &#64;Override
57 public void close() {
58
59 }
60
61 &#64;Override
62 public void activate() {
63 super.activate();
64 }
65
66 &#64;Override
67 public void deactivate() {
68 super.deactivate();
69 }
70
71 &#64;Override
72 public void ack(Object msgId) {
73 super.ack(msgId);
74 }
75
76 &#64;Override
77 public void fail(Object msgId) {
78 super.fail(msgId);
79 }
80
81 }

 

7.SplitBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.Map;
12
13 public class SplitBolt implements IRichBolt {
14 private OutputCollector collector;
15 &#64;Override
16 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
17 this.collector&#61;outputCollector;
18 }
19
20 &#64;Override
21 public void execute(Tuple tuple) {
22 String sentence&#61;tuple.getStringByField("sentence");
23 if(sentence!&#61;null&&!"".equals(sentence)){
24 String[] words&#61;sentence.split(" ");
25 for (String word:words){
26 this.collector.emit(new Values(word));
27 }
28 }
29 }
30
31 &#64;Override
32 public void cleanup() {
33
34 }
35
36 &#64;Override
37 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
38 outputFieldsDeclarer.declare(new Fields("word"));
39 }
40
41 &#64;Override
42 public Map getComponentConfiguration() {
43 return null;
44 }
45 }

 

8.CountBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.HashMap;
12 import java.util.Map;
13
14 public class CountBolt implements IRichBolt {
15 private Map counts;
16 private OutputCollector collector;
17 &#64;Override
18 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
19 this.collector&#61;outputCollector;
20 counts&#61;new HashMap<>();
21 }
22
23 &#64;Override
24 public void execute(Tuple tuple) {
25 String word&#61;tuple.getStringByField("word");
26 int count&#61;1;
27 if(counts.containsKey(word)){
28 count&#61;counts.get(word)&#43;1;
29 }
30 counts.put(word,count);
31 this.collector.emit(new Values(word,count));
32 }
33
34 &#64;Override
35 public void cleanup() {
36
37 }
38
39 &#64;Override
40 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
41 outputFieldsDeclarer.declare(new Fields("word","count"));
42 }
43
44 &#64;Override
45 public Map getComponentConfiguration() {
46 return null;
47 }
48 }

 

9.printBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Tuple;
8
9 import java.util.Map;
10
11 public class PrintBolt implements IRichBolt {
12 &#64;Override
13 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
14
15 }
16
17 &#64;Override
18 public void execute(Tuple tuple) {
19 String word&#61;tuple.getStringByField("word");
20 int count&#61;tuple.getIntegerByField("count");
21 System.out.println("word:"&#43;word&#43;", count:"&#43;count);
22 }
23
24 &#64;Override
25 public void cleanup() {
26
27 }
28
29 &#64;Override
30 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
31
32 }
33
34 &#64;Override
35 public Map getComponentConfiguration() {
36 return null;
37 }
38 }

 

10.WordCountTopology.java

1 package com.jun.it;
2
3 import backtype.storm.Config;
4 import backtype.storm.LocalCluster;
5 import backtype.storm.StormSubmitter;
6 import backtype.storm.generated.AlreadyAliveException;
7 import backtype.storm.generated.InvalidTopologyException;
8 import backtype.storm.topology.TopologyBuilder;
9 import backtype.storm.tuple.Fields;
10
11 public class WordCountTopology {
12 private static final String SENTENCE_SPOUT&#61;"sentenceSpout";
13 private static final String SPLIT_BOLT&#61;"splitBolt";
14 private static final String COUNT_BOLT&#61;"countBolt";
15 private static final String PRINT_BOLT&#61;"printBolt";
16 public static void main(String[] args){
17 TopologyBuilder topologyBuilder&#61;new TopologyBuilder();
18 topologyBuilder.setSpout(SENTENCE_SPOUT,new SentenceSpout());
19 topologyBuilder.setBolt(SPLIT_BOLT,new SplitBolt()).shuffleGrouping(SENTENCE_SPOUT);
20 topologyBuilder.setBolt(COUNT_BOLT,new CountBolt()).fieldsGrouping(SPLIT_BOLT,new Fields("word"));
21 topologyBuilder.setBolt(PRINT_BOLT,new PrintBolt()).globalGrouping(COUNT_BOLT);
22 Config config&#61;new Config();
23 if(args&#61;&#61;null||args.length&#61;&#61;0){
24 LocalCluster localCluster&#61;new LocalCluster();
25 localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
26 }else{
27 config.setNumWorkers(1);
28 try {
29 StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
30 } catch (AlreadyAliveException e) {
31 e.printStackTrace();
32 } catch (InvalidTopologyException e) {
33 e.printStackTrace();
34 }
35 }
36
37 }
38 }

 

三&#xff1a;本地运行

1.前提

  原本以为需要启动storm&#xff0c;后来发现&#xff0c;不需要启动Storm。

  只需要在main的时候Run即可

 

2.结果

  

 

 四&#xff1a;集群运行

1.在IDEA下打包

  下面的是有依赖的包。

 

 

2.上传到datas下

  

 

3.运行

   bin/storm jar /opt/datas/storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.jun.it.WordCountTopology wordcount

  

 

4.UI效果

  

 


转载于:https://www.cnblogs.com/juncaoit/p/6351492.html


推荐阅读
  • Android 九宫格布局详解及实现:人人网应用示例
    本文深入探讨了人人网Android应用中独特的九宫格布局设计,解析其背后的GridView实现原理,并提供详细的代码示例。这种布局方式不仅美观大方,而且在现代Android应用中较为少见,值得开发者借鉴。 ... [详细]
  • XNA 3.0 游戏编程:从 XML 文件加载数据
    本文介绍如何在 XNA 3.0 游戏项目中从 XML 文件加载数据。我们将探讨如何将 XML 数据序列化为二进制文件,并通过内容管道加载到游戏中。此外,还会涉及自定义类型读取器和写入器的实现。 ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • 将Web服务部署到Tomcat
    本文介绍了如何在JDeveloper 12c中创建一个Java项目,并将其打包为Web服务,然后部署到Tomcat服务器。内容涵盖从项目创建、编写Web服务代码、配置相关XML文件到最终的本地部署和验证。 ... [详细]
  • RecyclerView初步学习(一)
    RecyclerView初步学习(一)ReCyclerView提供了一种插件式的编程模式,除了提供ViewHolder缓存模式,还可以自定义动画,分割符,布局样式,相比于传统的ListVi ... [详细]
  • Explore how Matterverse is redefining the metaverse experience, creating immersive and meaningful virtual environments that foster genuine connections and economic opportunities. ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 使用 Azure Service Principal 和 Microsoft Graph API 获取 AAD 用户列表
    本文介绍了一段通用代码示例,该代码不仅能够操作 Azure Active Directory (AAD),还可以通过 Azure Service Principal 的授权访问和管理 Azure 订阅资源。Azure 的架构可以分为两个层级:AAD 和 Subscription。 ... [详细]
  • 本文深入探讨了 Java 中的 Serializable 接口,解释了其实现机制、用途及注意事项,帮助开发者更好地理解和使用序列化功能。 ... [详细]
  • 本文深入探讨了Linux系统中网卡绑定(bonding)的七种工作模式。网卡绑定技术通过将多个物理网卡组合成一个逻辑网卡,实现网络冗余、带宽聚合和负载均衡,在生产环境中广泛应用。文章详细介绍了每种模式的特点、适用场景及配置方法。 ... [详细]
  • 本文介绍了如何在C#中启动一个应用程序,并通过枚举窗口来获取其主窗口句柄。当使用Process类启动程序时,我们通常只能获得进程的句柄,而主窗口句柄可能为0。因此,我们需要使用API函数和回调机制来准确获取主窗口句柄。 ... [详细]
  • 本文介绍了如何通过 Maven 依赖引入 SQLiteJDBC 和 HikariCP 包,从而在 Java 应用中高效地连接和操作 SQLite 数据库。文章提供了详细的代码示例,并解释了每个步骤的实现细节。 ... [详细]
  • 使用Vultr云服务器和Namesilo域名搭建个人网站
    本文详细介绍了如何通过Vultr云服务器和Namesilo域名搭建一个功能齐全的个人网站,包括购买、配置服务器以及绑定域名的具体步骤。文章还提供了详细的命令行操作指南,帮助读者顺利完成建站过程。 ... [详细]
  • 使用Python在SAE上开发新浪微博应用的初步探索
    最近重新审视了新浪云平台(SAE)提供的服务,发现其已支持Python开发。本文将详细介绍如何利用Django框架构建一个简单的新浪微博应用,并分享开发过程中的关键步骤。 ... [详细]
  • 根据最新发布的《互联网人才趋势报告》,尽管大量IT从业者已转向Python开发,但随着人工智能和大数据领域的迅猛发展,仍存在巨大的人才缺口。本文将详细介绍如何使用Python编写一个简单的爬虫程序,并提供完整的代码示例。 ... [详细]
author-avatar
IT营
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有