1. 构建拓扑代码
package demo;import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;public class AreaAmtTopo {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5);
builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout");
builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id"));
builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt");}}
2.一级过滤bolt
package demo;import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//一级的过滤bolt
public class AreaFilterBolt implements IBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每个value的对应name}@Overridepublic Map
3.局部汇总bolt(按日期和区域和汇总)
package demo;import java.util.HashMap;
import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;//局部汇总
public class AreaAmtBolt implements IBasicBolt {Map
4. 最终结果写入Hbase
package demo;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;//结果定时写入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {Map
5. DateFmt代码
package demo;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;public class DateFmt {public static final String date_long="yyyy-MM-dd HH:mm:ss";public static final String date_short="yyyy-MM-dd";public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);public static String getCountDate(String date,String patton){SimpleDateFormat sdf=new SimpleDateFormat(patton);Calendar cal =Calendar.getInstance();if (date!=null){try {cal.setTime(sdf.parse(date));} catch (ParseException e) {e.printStackTrace();}}return sdf.format(cal.getTime());}public static Date parseDate(String dateStr) throws Exception{return sdf.parse(dateStr);}public static void main(String[] args) {System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));}
}