热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据处理框架之Strom:StormSpout可靠性

Spout作为数据源,当拓扑中bolt处理失败时该怎么办?Spout可靠性机制可以重发数据到bolt进行重新处理。ack确认机制 和fail 失败处理机制MessageSpout 

Spout作为数据源,当拓扑中bolt处理失败时该怎么办?Spout可靠性机制可以重发数据到bolt进行重新处理。

ack 确认机制  和 fail  失败处理机制

MessageSpout  ---->   split-bolt  ---->    write-bolt

看下面的例子:

MessageTopology

package bhz.topology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import bhz.bolt.SpliterBolt;
import bhz.bolt.WriterBolt;
import bhz.spout.MessageSpout;
public class MessageTopology {

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MessageSpout());
builder.setBolt("split-bolt", new SpliterBolt()).shuffleGrouping("spout");
builder.setBolt("write-bolt", new WriterBolt()).shuffleGrouping("split-bolt");
//本地配置
Config cOnfig= new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
System.out.println(cluster);
cluster.submitTopology("message", config, builder.createTopology());
Thread.sleep(10000);
cluster.killTopology("message");
cluster.shutdown();
}
}

MessageSpout

package bhz.spout;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MessageSpout implements IRichSpout {
private static final long serialVersiOnUID= 1L;
private int index = 0;

private String[] subjects = new String[]{
"groovy,oeacnbase",
"openfire,restful",
"flume,activiti",
"hadoop,hbase",
"spark,sqoop"
};

private SpoutOutputCollector collector;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

@Override
public void nextTuple() {
if(index String sub = subjects[index];
//发送信息参数1 为数值, 参数2为msgId
collector.emit(new Values(sub), index);
index++;
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("subjects"));
}
//当bolt 处理成功 ack确认 spout执行ack方法
@Override
public void ack(Object msgId) {
System.out.println("【消息发送成功!!!】 (msgId = " + msgId +")");
}
//当bolt处理失败时,spout调用fail方法,进行重发处理
@Override
public void fail(Object msgId) {
System.out.println("【消息发送失败!!!】 (msgId = " + msgId +")");
System.out.println("【重发进行中...】");
collector.emit(new Values(subjects[(Integer) msgId]), msgId);
System.out.println("【重发成功!!!】");
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}

 

SpliterBolt

package bhz.bolt;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SpliterBolt implements IRichBolt {
private static final long serialVersiOnUID= 1L;
private OutputCollector collector;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}


private boolean flag = false;

@Override
public void execute(Tuple tuple) {
try {
String subjects = tuple.getStringByField("subjects");

if(!flag && subjects.equals("flume,activiti")){
flag = true;
int a = 1/0;
}

String[] words = subjects.split(",");
//List list = new ArrayList();
//int index = 0;
for (String word : words) {
//注意这里循环发送消息,要携带tuple对象,用于处理异常时重发策略
collector.emit(tuple, new Values(word));
//list.add(word);
//index ++;
}
//collector.emit(tuple, new Values(list));
collector.ack(tuple);//通知spout处理成功
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);//通知spout 处理失败
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

@Override
public void cleanup() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}

WriterBolt

package bhz.bolt;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WriterBolt implements IRichBolt {
private static final long serialVersiOnUID= 1L;
private FileWriter writer;
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
writer = new FileWriter("d://message.txt");
} catch (IOException e) {
e.printStackTrace();
}
}
private boolean flag = false;

@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
// List list = (List)tuple.getValueByField("word");
// System.out.println("======================" + list);
try {
if(!flag && word.equals("hadoop")){
flag = true;
int a = 1/0;
}
writer.write(word);
writer.write("\r\n");
writer.flush();
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);//通知spout处理失败
}
collector.emit(tuple, new Values(word));
collector.ack(tuple);//通知spout处理成功
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}

 

spout重发机制会带来一个问题:数据重复消费,看上面的例子当WriterBolt执行失败的时候,spout 将hadoop,hbase重发,那么hbase会被WriterBolt再执行一次,目前storm对此没有保障机制,按照业务设计的通用做法就是使用幂等性(比如使用唯一性ID),防止重复消费数据。

 


推荐阅读
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • PHP中的单例模式与静态变量的区别及使用方法
    本文介绍了PHP中的单例模式与静态变量的区别及使用方法。在PHP中,静态变量的存活周期仅仅是每次PHP的会话周期,与Java、C++不同。静态变量在PHP中的作用域仅限于当前文件内,在函数或类中可以传递变量。本文还通过示例代码解释了静态变量在函数和类中的使用方法,并说明了静态变量的生命周期与结构体的生命周期相关联。同时,本文还介绍了静态变量在类中的使用方法,并通过示例代码展示了如何在类中使用静态变量。 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • importjava.util.ArrayList;publicclassPageIndex{privateintpageSize;每页要显示的行privateintpageNum ... [详细]
author-avatar
佐别
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有