热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Storm中关于Topology的设计

一:介绍Storm设计模型1.TopologyStorm对任务的抽象,其实就是将实时数据分析任务分解为不同的阶段点:计算组件Spo

一:介绍Storm设计模型

1.Topology

  Storm对任务的抽象,其实 就是将实时数据分析任务 分解为 不同的阶段    

  点: 计算组件   Spout   Bolt

  边: 数据流向    数据从上一个组件流向下一个组件  带方向

 

2.tuple

  Storm每条记录 封装成一个tuple

  其实就是一些keyvalue对按顺序排列

  方便组件获取数据

 

3.Spout

  数据采集器

  源源不断的日志记录  如何被topology接收进行处理?

  Spout负责从数据源上获取数据,简单处理 封装成tuple向后面的bolt发射

 

4.Bolt

  数据处理器

  

二:开发wordcount案例

1.书写整个大纲的点线图

  

 

2..程序结构

  

 

3.修改pom文件

  这个地方需要注意,在集群上的时候,这时候storm的包是有的,不要再打包,所以将provided打开。

  

1 xml version="1.0" encoding="UTF-8"?>
2 <project xmlns&#61;"http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0modelVersion>
6
7 <groupId>com.cj.itgroupId>
8 <artifactId>stormartifactId>
9 <version>1.0-SNAPSHOTversion>
10
11 <properties>
12 <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
13 <hbase.version>0.98.6-cdh5.3.6hbase.version>
14 <hdfs.version>2.5.0-cdh5.3.6hdfs.version>
15 <storm.version>0.9.6storm.version>
16 properties>
17
18 <repositories>
19 <repository>
20 <id>clouderaid>
21 <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
22 repository>
23 <repository>
24 <id>alimavenid>
25 <name>aliyun mavenname>
26 <url>http://maven.aliyun.com/nexus/content/groups/public/url>
27 repository>
28 repositories>
29
30 <dependencies>
31 <dependency>
32 <groupId>junitgroupId>
33 <artifactId>junitartifactId>
34 <version>4.12version>
35 <scope>testscope>
36 dependency>
37 <dependency>
38 <groupId>org.apache.stormgroupId>
39 <artifactId>storm-coreartifactId>
40 <version>${storm.version}version>
41
42
43
44 dependency>
45 <dependency>
46 <groupId>org.apache.stormgroupId>
47 <artifactId>storm-hbaseartifactId>
48 <version>${storm.version}version>
49 <exclusions>
50 <exclusion>
51 <groupId>org.apache.hadoopgroupId>
52 <artifactId>hadoop-hdfsartifactId>
53 exclusion>
54 <exclusion>
55 <groupId>org.apache.hbasegroupId>
56 <artifactId>hbase-clientartifactId>
57 exclusion>
58 exclusions>
59 dependency>
60
61 <dependency>
62 <groupId>org.apache.hadoopgroupId>
63 <artifactId>hadoop-hdfsartifactId>
64 <version>${hdfs.version}version>
65 dependency>
66 <dependency>
67 <groupId>org.apache.hbasegroupId>
68 <artifactId>hbase-clientartifactId>
69 <version>${hbase.version}version>
70 <exclusions>
71 <exclusion>
72 <artifactId>slf4j-log4j12artifactId>
73 <groupId>org.slf4jgroupId>
74 exclusion>
75 exclusions>
76 dependency>
77 <dependency>
78 <groupId>org.apache.zookeepergroupId>
79 <artifactId>zookeeperartifactId>
80 <version>3.4.6version>
81 <exclusions>
82 <exclusion>
83 <artifactId>slf4j-log4j12artifactId>
84 <groupId>org.slf4jgroupId>
85 exclusion>
86 exclusions>
87 dependency>
88 <dependency>
89 <groupId>org.apache.stormgroupId>
90 <artifactId>storm-kafkaartifactId>
91 <version>${storm.version}version>
92 <exclusions>
93 <exclusion>
94 <groupId>org.apache.zookeepergroupId>
95 <artifactId>zookeeperartifactId>
96 exclusion>
97 exclusions>
98 dependency>
99 <dependency>
100 <groupId>org.apache.kafkagroupId>
101 <artifactId>kafka_2.10artifactId>
102 <version>0.8.1.1version>
103 <exclusions>
104 <exclusion>
105 <groupId>org.apache.zookeepergroupId>
106 <artifactId>zookeeperartifactId>
107 exclusion>
108 <exclusion>
109 <groupId>log4jgroupId>
110 <artifactId>log4jartifactId>
111 exclusion>
112 exclusions>
113 dependency>
114 <dependency>
115 <groupId>org.mockitogroupId>
116 <artifactId>mockito-allartifactId>
117 <version>1.9.5version>
118 <scope>testscope>
119 dependency>
120 <dependency>
121 <groupId>cz.mallat.uasparsergroupId>
122 <artifactId>uasparserartifactId>
123 <version>0.6.1version>
124 dependency>
125 dependencies>
126
127 <build>
128 <plugins>
129 <plugin>
130 <artifactId>maven-compiler-pluginartifactId>
131 <version>3.3version>
132 <configuration>
133 <source>1.7source>
134 <target>1.7target>
135 configuration>
136 plugin>
137 <plugin>
138 <artifactId>maven-assembly-pluginartifactId>
139 <version>2.4version>
140 <configuration>
141 <descriptors>
142 <descriptor>src/main/assembly/src.xmldescriptor>
143 descriptors>
144 <descriptorRefs>
145 <descriptorRef>jar-with-dependenciesdescriptorRef>
146 descriptorRefs>
147 configuration>
148 <executions>
149 <execution>
150 <id>make-assemblyid>
151 <phase>packagephase>
152 <goals>
153 <goal>singlegoal>
154 goals>
155 execution>
156 executions>
157 plugin>
158 plugins>
159 build>
160
161 project>

 

4.src.xml

1 xml version&#61;"1.0" encoding&#61;"UTF-8"?>
2 <assembly
3 xmlns&#61;"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
4 xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
5 xsi:schemaLocation&#61;"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
6 <id>jar-with-dependenciesid>
7 <formats>
8 <format>jarformat>
9 formats>
10 <includeBaseDirectory>falseincludeBaseDirectory>
11 <dependencySets>
12 <dependencySet>
13 <unpack>falseunpack>
14 <scope>runtimescope>
15 dependencySet>
16 dependencySets>
17 <fileSets>
18 <fileSet>
19 <directory>/libdirectory>
20 fileSet>
21 fileSets>
22 assembly>

 

5.log

1 log4j.rootLogger&#61;info,console
2
3 log4j.appender.console&#61;org.apache.log4j.ConsoleAppender
4 log4j.appender.console.layout&#61;org.apache.log4j.SimpleLayout
5
6 log4j.logger.com.ibeifeng&#61;INFO

 

6.SentenceSpout.java

1 package com.jun.it;
2
3 import backtype.storm.spout.SpoutOutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichSpout;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.topology.base.BaseRichSpout;
8 import backtype.storm.tuple.Fields;
9 import backtype.storm.tuple.Values;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import java.util.Map;
14 import java.util.Random;
15
16
17 public class SentenceSpout extends BaseRichSpout {
18 private static final Logger logger&#61; LoggerFactory.getLogger(SentenceSpout.class);
19 private SpoutOutputCollector collector;
20 //制造数据
21 private static final String[] SENTENCES&#61;{
22 "hadoop oozie storm hive",
23 "hadoop spark sqoop hbase",
24 "error flume yarn mapreduce"
25 };
26 //初始化collector
27 &#64;Override
28 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
29 this.collector&#61;spoutOutputCollector;
30 }
31 //Key的设置
32 &#64;Override
33 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
34 outputFieldsDeclarer.declare(new Fields("sentence"));
35 }
36 //Tuple的组装
37 &#64;Override
38 public void nextTuple() {
39 String sentence&#61;SENTENCES[new Random().nextInt(SENTENCES.length)];
40 if(sentence.contains("error")){
41 logger.error("记录有问题"&#43;sentence);
42 }else{
43 this.collector.emit(new Values(sentence));
44 }
45 try{
46 Thread.sleep(1000);
47 }catch (Exception e){
48 e.printStackTrace();
49 }
50 }
51
52 public SentenceSpout() {
53 super();
54 }
55
56 &#64;Override
57 public void close() {
58
59 }
60
61 &#64;Override
62 public void activate() {
63 super.activate();
64 }
65
66 &#64;Override
67 public void deactivate() {
68 super.deactivate();
69 }
70
71 &#64;Override
72 public void ack(Object msgId) {
73 super.ack(msgId);
74 }
75
76 &#64;Override
77 public void fail(Object msgId) {
78 super.fail(msgId);
79 }
80
81 }

 

7.SplitBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.Map;
12
13 public class SplitBolt implements IRichBolt {
14 private OutputCollector collector;
15 &#64;Override
16 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
17 this.collector&#61;outputCollector;
18 }
19
20 &#64;Override
21 public void execute(Tuple tuple) {
22 String sentence&#61;tuple.getStringByField("sentence");
23 if(sentence!&#61;null&&!"".equals(sentence)){
24 String[] words&#61;sentence.split(" ");
25 for (String word:words){
26 this.collector.emit(new Values(word));
27 }
28 }
29 }
30
31 &#64;Override
32 public void cleanup() {
33
34 }
35
36 &#64;Override
37 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
38 outputFieldsDeclarer.declare(new Fields("word"));
39 }
40
41 &#64;Override
42 public Map getComponentConfiguration() {
43 return null;
44 }
45 }

 

8.CountBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.HashMap;
12 import java.util.Map;
13
14 public class CountBolt implements IRichBolt {
15 private Map counts;
16 private OutputCollector collector;
17 &#64;Override
18 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
19 this.collector&#61;outputCollector;
20 counts&#61;new HashMap<>();
21 }
22
23 &#64;Override
24 public void execute(Tuple tuple) {
25 String word&#61;tuple.getStringByField("word");
26 int count&#61;1;
27 if(counts.containsKey(word)){
28 count&#61;counts.get(word)&#43;1;
29 }
30 counts.put(word,count);
31 this.collector.emit(new Values(word,count));
32 }
33
34 &#64;Override
35 public void cleanup() {
36
37 }
38
39 &#64;Override
40 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
41 outputFieldsDeclarer.declare(new Fields("word","count"));
42 }
43
44 &#64;Override
45 public Map getComponentConfiguration() {
46 return null;
47 }
48 }

 

9.printBolt.java

1 package com.jun.it;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.IRichBolt;
6 import backtype.storm.topology.OutputFieldsDeclarer;
7 import backtype.storm.tuple.Tuple;
8
9 import java.util.Map;
10
11 public class PrintBolt implements IRichBolt {
12 &#64;Override
13 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
14
15 }
16
17 &#64;Override
18 public void execute(Tuple tuple) {
19 String word&#61;tuple.getStringByField("word");
20 int count&#61;tuple.getIntegerByField("count");
21 System.out.println("word:"&#43;word&#43;", count:"&#43;count);
22 }
23
24 &#64;Override
25 public void cleanup() {
26
27 }
28
29 &#64;Override
30 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
31
32 }
33
34 &#64;Override
35 public Map getComponentConfiguration() {
36 return null;
37 }
38 }

 

10.WordCountTopology.java

1 package com.jun.it;
2
3 import backtype.storm.Config;
4 import backtype.storm.LocalCluster;
5 import backtype.storm.StormSubmitter;
6 import backtype.storm.generated.AlreadyAliveException;
7 import backtype.storm.generated.InvalidTopologyException;
8 import backtype.storm.topology.TopologyBuilder;
9 import backtype.storm.tuple.Fields;
10
11 public class WordCountTopology {
12 private static final String SENTENCE_SPOUT&#61;"sentenceSpout";
13 private static final String SPLIT_BOLT&#61;"splitBolt";
14 private static final String COUNT_BOLT&#61;"countBolt";
15 private static final String PRINT_BOLT&#61;"printBolt";
16 public static void main(String[] args){
17 TopologyBuilder topologyBuilder&#61;new TopologyBuilder();
18 topologyBuilder.setSpout(SENTENCE_SPOUT,new SentenceSpout());
19 topologyBuilder.setBolt(SPLIT_BOLT,new SplitBolt()).shuffleGrouping(SENTENCE_SPOUT);
20 topologyBuilder.setBolt(COUNT_BOLT,new CountBolt()).fieldsGrouping(SPLIT_BOLT,new Fields("word"));
21 topologyBuilder.setBolt(PRINT_BOLT,new PrintBolt()).globalGrouping(COUNT_BOLT);
22 Config config&#61;new Config();
23 if(args&#61;&#61;null||args.length&#61;&#61;0){
24 LocalCluster localCluster&#61;new LocalCluster();
25 localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
26 }else{
27 config.setNumWorkers(1);
28 try {
29 StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
30 } catch (AlreadyAliveException e) {
31 e.printStackTrace();
32 } catch (InvalidTopologyException e) {
33 e.printStackTrace();
34 }
35 }
36
37 }
38 }

 

三&#xff1a;本地运行

1.前提

  原本以为需要启动storm&#xff0c;后来发现&#xff0c;不需要启动Storm。

  只需要在main的时候Run即可

 

2.结果

  

 

 四&#xff1a;集群运行

1.在IDEA下打包

  下面的是有依赖的包。

 

 

2.上传到datas下

  

 

3.运行

   bin/storm jar /opt/datas/storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.jun.it.WordCountTopology wordcount

  

 

4.UI效果

  

 


转载于:https://www.cnblogs.com/juncaoit/p/6351492.html


推荐阅读
  • 本文介绍如何在 Android 中自定义加载对话框 CustomProgressDialog,包括自定义 View 类和 XML 布局文件的详细步骤。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • 在处理遗留数据库的映射时,反向工程是一个重要的初始步骤。由于实体模式已经在数据库系统中存在,Hibernate 提供了自动化工具来简化这一过程,帮助开发人员快速生成持久化类和映射文件。通过反向工程,可以显著提高开发效率并减少手动配置的错误。此外,该工具还支持对现有数据库结构进行分析,自动生成符合 Hibernate 规范的配置文件,从而加速项目的启动和开发周期。 ... [详细]
  • Spring框架中的面向切面编程(AOP)技术详解
    面向切面编程(AOP)是Spring框架中的关键技术之一,它通过将横切关注点从业务逻辑中分离出来,实现了代码的模块化和重用。AOP的核心思想是将程序运行过程中需要多次处理的功能(如日志记录、事务管理等)封装成独立的模块,即切面,并在特定的连接点(如方法调用)动态地应用这些切面。这种方式不仅提高了代码的可维护性和可读性,还简化了业务逻辑的实现。Spring AOP利用代理机制,在不修改原有代码的基础上,实现了对目标对象的增强。 ... [详细]
  • 掌握Android UI设计:利用ZoomControls实现图片缩放功能
    本文介绍了如何在Android应用中通过使用ZoomControls组件来实现图片的缩放功能。ZoomControls提供了一种简单且直观的方式,让用户可以通过点击放大和缩小按钮来调整图片的显示大小。文章详细讲解了ZoomControls的基本用法、布局设置以及与ImageView的结合使用方法,适合初学者快速掌握Android UI设计中的这一重要功能。 ... [详细]
  • 在过去,我曾使用过自建MySQL服务器中的MyISAM和InnoDB存储引擎(也曾尝试过Memory引擎)。今年初,我开始转向阿里云的关系型数据库服务,并深入研究了其高效的压缩存储引擎TokuDB。TokuDB在数据压缩和处理大规模数据集方面表现出色,显著提升了存储效率和查询性能。通过实际应用,我发现TokuDB不仅能够有效减少存储成本,还能显著提高数据处理速度,特别适用于高并发和大数据量的场景。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • 构建高可用性Spark分布式集群:大数据环境下的最佳实践
    在构建高可用性的Spark分布式集群过程中,确保所有节点之间的无密码登录是至关重要的一步。通过在每个节点上生成SSH密钥对(使用 `ssh-keygen -t rsa` 命令并保持默认设置),可以实现这一目标。此外,还需将生成的公钥分发到所有节点的 `~/.ssh/authorized_keys` 文件中,以确保节点间的无缝通信。为了进一步提升集群的稳定性和性能,建议采用负载均衡和故障恢复机制,并定期进行系统监控和维护。 ... [详细]
  • Spring框架入门指南:专为新手打造的详细学习笔记
    Spring框架是Java Web开发中广泛应用的轻量级应用框架,以其卓越的功能和出色的性能赢得了广大开发者的青睐。本文为初学者提供了详尽的学习指南,涵盖基础概念、核心组件及实际应用案例,帮助新手快速掌握Spring框架的核心技术与实践技巧。 ... [详细]
  • 本文介绍了如何利用HTTP隧道技术在受限网络环境中绕过IDS和防火墙等安全设备,实现RDP端口的暴力破解攻击。文章详细描述了部署过程、攻击实施及流量分析,旨在提升网络安全意识。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • Android 图像色彩处理技术详解
    本文详细探讨了 Android 平台上的图像色彩处理技术,重点介绍了如何通过模仿美图秀秀的交互方式,利用 SeekBar 实现对图片颜色的精细调整。文章展示了具体的布局设计和代码实现,帮助开发者更好地理解和应用图像处理技术。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
author-avatar
IT营
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有