热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Storm中关于Topology的设计

一:介绍Storm设计模型1.TopologyStorm对任务的抽象,其实就是将实时数据分析任务分解为不同的阶段点:计算组件Spo

一:介绍Storm设计模型

1.Topology

  Storm对任务的抽象,其实 就是将实时数据分析任务 分解为 不同的阶段    

  点: 计算组件   Spout   Bolt

  边: 数据流向    数据从上一个组件流向下一个组件  带方向

 

2.tuple

  Storm每条记录 封装成一个tuple

  其实就是一些keyvalue对按顺序排列

  方便组件获取数据

 

3.Spout

  数据采集器

  源源不断的日志记录  如何被topology接收进行处理?

  Spout负责从数据源上获取数据,简单处理 封装成tuple向后面的bolt发射

 

4.Bolt

  数据处理器

  

二:开发wordcount案例

1.书写整个大纲的点线图

  

 

2..程序结构

  

 

3.修改pom文件

  这个地方需要注意,在集群上的时候,这时候storm的包是有的,不要再打包,所以将provided打开。

  

1 xml version="1.0" encoding="UTF-8"?>
2 <project xmlns&#61;"http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0modelVersion>
6
7 <groupId>com.cj.itgroupId>
8 <artifactId>stormartifactId>
9 <version>1.0-SNAPSHOTversion>
10
11 <properties>
12 <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
13 <hbase.version>0.98.6-cdh5.3.6hbase.version>
14 <hdfs.version>2.5.0-cdh5.3.6hdfs.version>
15 <storm.version>0.9.6storm.version>
16 properties>
17
18 <repositories>
19 <repository>
20 <id>clouderaid>
21 <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
22 repository>
23 <repository>
24 <id>alimavenid>
25 <name>aliyun mavenname>
26 <url>http://maven.aliyun.com/nexus/content/groups/public/url>
27 repository>
28 repositories>
29
30 <dependencies>
31 <dependency>
32 <groupId>junitgroupId>
33 <artifactId>junitartifactId>
34 <version>4.12version>
35 <scope>testscope>
36 dependency>
37 <dependency>
38 <groupId>org.apache.stormgroupId>
39 <artifactId>storm-coreartifactId>
40 <version>${storm.version}version>
41
42
43
44 dependency>
45 <dependency>
46 <groupId>org.apache.stormgroupId>
47 <artifactId>storm-hbaseartifactId>
48 <version>${storm.version}version>
49 <exclusions>
50 <exclusion>
51 <groupId>org.apache.hadoopgroupId>
52 <artifactId>hadoop-hdfsartifactId>
53 exclusion>
54 <exclusion>
55 <groupId>org.apache.hbasegroupId>
56 <artifactId>hbase-clientartifactId>
57 exclusion>
58 exclusions>
59 dependency>
60
61 <dependency>
62 <groupId>org.apache.hadoopgroupId>
63 <artifactId>hadoop-hdfsartifactId>
64 <version>${hdfs.version}version>
65 dependency>
66 <dependency>
67 <groupId>org.apache.hbasegroupId>
68 <artifactId>hbase-clientartifactId>
69 <version>${hbase.version}version>
70 <exclusions>
71 <exclusion>
72 <artifactId>slf4j-log4j12artifactId>
73 <groupId>org.slf4jgroupId>
74 exclusion>
75 exclusions>
76 dependency>
77 <dependency>
78 <groupId>org.apache.zookeepergroupId>
79 <artifactId>zookeeperartifactId>
80 <version>3.4.6version>
81 <exclusions>
82 <exclusion>
83 <artifactId>slf4j-log4j12artifactId>
84 <groupId>org.slf4jgroupId>
85 exclusion>
86 exclusions>
87 dependency>
88 <dependency>
89 <groupId>org.apache.stormgroupId>
90 <artifactId>storm-kafkaartifactId>
91 <version>${storm.version}version>
92 <exclusions>
93 <exclusion>
94 <groupId>org.apache.zookeepergroupId>
95 <artifactId>zookeeperartifactId>
96 exclusion>
97 exclusions>
98 dependency>
99 <dependency>
100 <groupId>org.apache.kafkagroupId>
101 <artifactId>kafka_2.10artifactId>
102 <version>0.8.1.1version>
103 <exclusions>
104 <exclusion>
105 <groupId>org.apache.zookeepergroupId>
106 <artifactId>zookeeperartifactId>
107 exclusion>
108 <exclusion>
109 <groupId>log4jgroupId>
110 <artifactId>log4jartifactId>
111 exclusion>
112 exclusions>
113 dependency>
114 <dependency>
115 <groupId>org.mockitogroupId>
116 <artifactId>mockito-allartifactId>
117 <version>1.9.5version>
118 <scope>testscope>
119 dependency>
120 <dependency>
121 <groupId>cz.mallat.uasparsergroupId>
122 <artifactId>uasparserartifactId>
123 <version>0.6.1version>
124 dependency>
125 dependencies>
126
127 <build>
128 <plugins>
129 <plugin>
130 <artifactId>maven-compiler-pluginartifactId>
131 <version>3.3version>
132 <configuration>
133 <source>1.7source>
134 <target>1.7target>
135 configuration>
136 plugin>
137 <plugin>
138 <artifactId>maven-assembly-pluginartifactId>
139 <version>2.4version>
140 <configuration>
141 <descriptors>
142 <descriptor>src/main/assembly/src.xmldescriptor>
143 descriptors>
144 <descriptorRefs>
145 <descriptorRef>jar-with-dependenciesdescriptorRef>
146 descriptorRefs>
147 configuration>
148 <executions>
149 <execution>
150 <id>make-assemblyid>
151 <phase>packagephase>
152 <goals>
153 <goal>singlegoal>
154 goals>
155 execution>
156 executions>
157 plugin>
158 plugins>
159 build>
160
161 project>

 

4.src.xml

1 xml version&#61;"1.0" encoding&#61;"UTF-8"?>
2 <assembly
3 xmlns&#61;"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
4 xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
5 xsi:schemaLocation&#61;"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
6 <id>jar-with-dependenciesid>
7 <formats>
8 <format>jarformat>
9 formats>
10 <includeBaseDirectory>falseincludeBaseDirectory>
11 <dependencySets>
12 <dependencySet>
13 <unpack>falseunpack>
14 <scope>runtimescope>
15 dependencySet>
16 dependencySets>
17 <fileSets>
18 <fileSet>
19 <directory>/libdirectory>
20 fileSet>
21 fileSets>
22 assembly>

 

5.log

1 log4j.rootLogger&#61;info,console
2
3 log4j.appender.console&#61;org.apache.log4j.ConsoleAppender
4 log4j.appender.console.layout&#61;org.apache.log4j.SimpleLayout
5
6 log4j.logger.com.ibeifeng&#61;INFO

 

6.SentenceSpout.java

1 package com.jun.it;
2
3 import backtype.storm.spout.SpoutOutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichSpout;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.topology.base.BaseRichSpout;
8 import backtype.storm.tuple.Fields;
9 import backtype.storm.tuple.Values;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import java.util.Map;
14 import java.util.Random;
15
16
17 public class SentenceSpout extends BaseRichSpout {
18 private static final Logger logger&#61; LoggerFactory.getLogger(SentenceSpout.class);
19 private SpoutOutputCollector collector;
20 //制造数据
21 private static final String[] SENTENCES&#61;{
22 "hadoop oozie storm hive",
23 "hadoop spark sqoop hbase",
24 "error flume yarn mapreduce"
25 };
26 //初始化collector
27 &#64;Override
28 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
29 this.collector&#61;spoutOutputCollector;
30 }
31 //Key的设置
32 &#64;Override
33 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
34 outputFieldsDeclarer.declare(new Fields("sentence"));
35 }
36 //Tuple的组装
37 &#64;Override
38 public void nextTuple() {
39 String sentence&#61;SENTENCES[new Random().nextInt(SENTENCES.length)];
40 if(sentence.contains("error")){
41 logger.error("记录有问题"&#43;sentence);
42 }else{
43 this.collector.emit(new Values(sentence));
44 }
45 try{
46 Thread.sleep(1000);
47 }catch (Exception e){
48 e.printStackTrace();
49 }
50 }
51
52 public SentenceSpout() {
53 super();
54 }
55
56 &#64;Override
57 public void close() {
58
59 }
60
61 &#64;Override
62 public void activate() {
63 super.activate();
64 }
65
66 &#64;Override
67 public void deactivate() {
68 super.deactivate();
69 }
70
71 &#64;Override
72 public void ack(Object msgId) {
73 super.ack(msgId);
74 }
75
76 &#64;Override
77 public void fail(Object msgId) {
78 super.fail(msgId);
79 }
80
81 }

 

7.SplitBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.Map;
12
13 public class SplitBolt implements IRichBolt {
14 private OutputCollector collector;
15 &#64;Override
16 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
17 this.collector&#61;outputCollector;
18 }
19
20 &#64;Override
21 public void execute(Tuple tuple) {
22 String sentence&#61;tuple.getStringByField("sentence");
23 if(sentence!&#61;null&&!"".equals(sentence)){
24 String[] words&#61;sentence.split(" ");
25 for (String word:words){
26 this.collector.emit(new Values(word));
27 }
28 }
29 }
30
31 &#64;Override
32 public void cleanup() {
33
34 }
35
36 &#64;Override
37 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
38 outputFieldsDeclarer.declare(new Fields("word"));
39 }
40
41 &#64;Override
42 public Map getComponentConfiguration() {
43 return null;
44 }
45 }

 

8.CountBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.HashMap;
12 import java.util.Map;
13
14 public class CountBolt implements IRichBolt {
15 private Map counts;
16 private OutputCollector collector;
17 &#64;Override
18 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
19 this.collector&#61;outputCollector;
20 counts&#61;new HashMap<>();
21 }
22
23 &#64;Override
24 public void execute(Tuple tuple) {
25 String word&#61;tuple.getStringByField("word");
26 int count&#61;1;
27 if(counts.containsKey(word)){
28 count&#61;counts.get(word)&#43;1;
29 }
30 counts.put(word,count);
31 this.collector.emit(new Values(word,count));
32 }
33
34 &#64;Override
35 public void cleanup() {
36
37 }
38
39 &#64;Override
40 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
41 outputFieldsDeclarer.declare(new Fields("word","count"));
42 }
43
44 &#64;Override
45 public Map getComponentConfiguration() {
46 return null;
47 }
48 }

 

9.printBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Tuple;
8
9 import java.util.Map;
10
11 public class PrintBolt implements IRichBolt {
12 &#64;Override
13 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
14
15 }
16
17 &#64;Override
18 public void execute(Tuple tuple) {
19 String word&#61;tuple.getStringByField("word");
20 int count&#61;tuple.getIntegerByField("count");
21 System.out.println("word:"&#43;word&#43;", count:"&#43;count);
22 }
23
24 &#64;Override
25 public void cleanup() {
26
27 }
28
29 &#64;Override
30 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
31
32 }
33
34 &#64;Override
35 public Map getComponentConfiguration() {
36 return null;
37 }
38 }

 

10.WordCountTopology.java

1 package com.jun.it;
2
3 import backtype.storm.Config;
4 import backtype.storm.LocalCluster;
5 import backtype.storm.StormSubmitter;
6 import backtype.storm.generated.AlreadyAliveException;
7 import backtype.storm.generated.InvalidTopologyException;
8 import backtype.storm.topology.TopologyBuilder;
9 import backtype.storm.tuple.Fields;
10
11 public class WordCountTopology {
12 private static final String SENTENCE_SPOUT&#61;"sentenceSpout";
13 private static final String SPLIT_BOLT&#61;"splitBolt";
14 private static final String COUNT_BOLT&#61;"countBolt";
15 private static final String PRINT_BOLT&#61;"printBolt";
16 public static void main(String[] args){
17 TopologyBuilder topologyBuilder&#61;new TopologyBuilder();
18 topologyBuilder.setSpout(SENTENCE_SPOUT,new SentenceSpout());
19 topologyBuilder.setBolt(SPLIT_BOLT,new SplitBolt()).shuffleGrouping(SENTENCE_SPOUT);
20 topologyBuilder.setBolt(COUNT_BOLT,new CountBolt()).fieldsGrouping(SPLIT_BOLT,new Fields("word"));
21 topologyBuilder.setBolt(PRINT_BOLT,new PrintBolt()).globalGrouping(COUNT_BOLT);
22 Config config&#61;new Config();
23 if(args&#61;&#61;null||args.length&#61;&#61;0){
24 LocalCluster localCluster&#61;new LocalCluster();
25 localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
26 }else{
27 config.setNumWorkers(1);
28 try {
29 StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
30 } catch (AlreadyAliveException e) {
31 e.printStackTrace();
32 } catch (InvalidTopologyException e) {
33 e.printStackTrace();
34 }
35 }
36
37 }
38 }

 

三&#xff1a;本地运行

1.前提

  原本以为需要启动storm&#xff0c;后来发现&#xff0c;不需要启动Storm。

  只需要在main的时候Run即可

 

2.结果

  

 

 四&#xff1a;集群运行

1.在IDEA下打包

  下面的是有依赖的包。

 

 

2.上传到datas下

  

 

3.运行

   bin/storm jar /opt/datas/storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.jun.it.WordCountTopology wordcount

  

 

4.UI效果

  

 


转载于:https://www.cnblogs.com/juncaoit/p/6351492.html


推荐阅读
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • iOS超签签名服务器搭建及其优劣势
    本文介绍了搭建iOS超签签名服务器的原因和优势,包括不掉签、用户可以直接安装不需要信任、体验好等。同时也提到了超签的劣势,即一个证书只能安装100个,成本较高。文章还详细介绍了超签的实现原理,包括用户请求服务器安装mobileconfig文件、服务器调用苹果接口添加udid等步骤。最后,还提到了生成mobileconfig文件和导出AppleWorldwideDeveloperRelationsCertificationAuthority证书的方法。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • 前言折腾了一段时间hadoop的部署管理,写下此系列博客记录一下。为了避免各位做部署这种重复性的劳动,我已经把部署的步骤写成脚本,各位只需要按着本文把脚本执行完,整个环境基本就部署 ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • 本文介绍了使用kotlin实现动画效果的方法,包括上下移动、放大缩小、旋转等功能。通过代码示例演示了如何使用ObjectAnimator和AnimatorSet来实现动画效果,并提供了实现抖动效果的代码。同时还介绍了如何使用translationY和translationX来实现上下和左右移动的效果。最后还提供了一个anim_small.xml文件的代码示例,可以用来实现放大缩小的效果。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
  • 本文整理了315道Python基础题目及答案,帮助读者检验学习成果。文章介绍了学习Python的途径、Python与其他编程语言的对比、解释型和编译型编程语言的简述、Python解释器的种类和特点、位和字节的关系、以及至少5个PEP8规范。对于想要检验自己学习成果的读者,这些题目将是一个不错的选择。请注意,答案在视频中,本文不提供答案。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • Hbase1.2.0cdh5.16.2使用PREFIX_TREE编码导致集群压缩队列异常
    Hbase1.X版本中PREFIX_TREE作为BlockEncoding存在bug,会造成RegionServer节点compactionqueue持续升高,甚至影响fl ... [详细]
  • android:customfonts中有一个自定义字体的图书库以下是如何使用它的示例.在gradle中你需要把这行:compileuk.co.chri ... [详细]
author-avatar
IT营
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有