作者:佐别 | 来源:互联网 | 2023-02-09 10:16
Spout作为数据源,当拓扑中bolt处理失败时该怎么办?Spout可靠性机制可以重发数据到bolt进行重新处理。ack确认机制 和fail 失败处理机制MessageSpout
Spout作为数据源,当拓扑中bolt处理失败时该怎么办?Spout可靠性机制可以重发数据到bolt进行重新处理。
ack 确认机制 和 fail 失败处理机制
MessageSpout ----> split-bolt ----> write-bolt
看下面的例子:
MessageTopology
package bhz.topology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import bhz.bolt.SpliterBolt;
import bhz.bolt.WriterBolt;
import bhz.spout.MessageSpout;
public class MessageTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MessageSpout());
builder.setBolt("split-bolt", new SpliterBolt()).shuffleGrouping("spout");
builder.setBolt("write-bolt", new WriterBolt()).shuffleGrouping("split-bolt");
//本地配置
Config cOnfig= new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
System.out.println(cluster);
cluster.submitTopology("message", config, builder.createTopology());
Thread.sleep(10000);
cluster.killTopology("message");
cluster.shutdown();
}
}
MessageSpout
package bhz.spout;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MessageSpout implements IRichSpout {
private static final long serialVersiOnUID= 1L;
private int index = 0;
private String[] subjects = new String[]{
"groovy,oeacnbase",
"openfire,restful",
"flume,activiti",
"hadoop,hbase",
"spark,sqoop"
};
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
if(index String sub = subjects[index];
//发送信息参数1 为数值, 参数2为msgId
collector.emit(new Values(sub), index);
index++;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("subjects"));
}
//当bolt 处理成功 ack确认 spout执行ack方法
@Override
public void ack(Object msgId) {
System.out.println("【消息发送成功!!!】 (msgId = " + msgId +")");
}
//当bolt处理失败时,spout调用fail方法,进行重发处理
@Override
public void fail(Object msgId) {
System.out.println("【消息发送失败!!!】 (msgId = " + msgId +")");
System.out.println("【重发进行中...】");
collector.emit(new Values(subjects[(Integer) msgId]), msgId);
System.out.println("【重发成功!!!】");
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
SpliterBolt
package bhz.bolt;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SpliterBolt implements IRichBolt {
private static final long serialVersiOnUID= 1L;
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
private boolean flag = false;
@Override
public void execute(Tuple tuple) {
try {
String subjects = tuple.getStringByField("subjects");
if(!flag && subjects.equals("flume,activiti")){
flag = true;
int a = 1/0;
}
String[] words = subjects.split(",");
//List list = new ArrayList();
//int index = 0;
for (String word : words) {
//注意这里循环发送消息,要携带tuple对象,用于处理异常时重发策略
collector.emit(tuple, new Values(word));
//list.add(word);
//index ++;
}
//collector.emit(tuple, new Values(list));
collector.ack(tuple);//通知spout处理成功
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);//通知spout 处理失败
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}WriterBolt
package bhz.bolt;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WriterBolt implements IRichBolt {
private static final long serialVersiOnUID= 1L;
private FileWriter writer;
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
writer = new FileWriter("d://message.txt");
} catch (IOException e) {
e.printStackTrace();
}
}
private boolean flag = false;
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
// List list = (List)tuple.getValueByField("word");
// System.out.println("======================" + list);
try {
if(!flag && word.equals("hadoop")){
flag = true;
int a = 1/0;
}
writer.write(word);
writer.write("\r\n");
writer.flush();
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);//通知spout处理失败
}
collector.emit(tuple, new Values(word));
collector.ack(tuple);//通知spout处理成功
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
spout重发机制会带来一个问题:数据重复消费,看上面的例子当WriterBolt执行失败的时候,spout 将hadoop,hbase重发,那么hbase会被WriterBolt再执行一次,目前storm对此没有保障机制,按照业务设计的通用做法就是使用幂等性(比如使用唯一性ID),防止重复消费数据。