作者:nora抹抹茶I | 来源:互联网 | 2023-08-29 14:10
首先看一下Hbase的数据 ,本系统将Hbase放入mysql
首先看一下hbase表结构
需求:根据用户在hbase的通话记录,求出每个用户每个月总共通话时间,放入mysql中
第一步、建立mapper端
package phoneXM;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;import java.io.IOException;public class PhoneMapper extends TableMapper {@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//将fruit的name和color提取出来,相当于将每一行数据提取出来放入put中Put put = new Put(key.get());
// Get get = new Get();//遍历行String rowkey = new String(key.get());String name = "";String phone = "";String name2 = "";String phone2 = "";String time = "";String sum = "";for (Cell cell :value.rawCells()) {if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {/// 添 加 克 隆 列 :nameif ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//将该列 cell 加入到 put 对象中name = Bytes.toString(CellUtil.cloneValue(cell));} else if ("phone".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中phone = Bytes.toString(CellUtil.cloneValue(cell));}else if ("nameTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中name2 = Bytes.toString(CellUtil.cloneValue(cell));}else if ("phoneTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中phone2 = Bytes.toString(CellUtil.cloneValue(cell));}else if ("time".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中time = Bytes.toString(CellUtil.cloneValue(cell));}else if ("sum".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中sum = Bytes.toString(CellUtil.cloneValue(cell));}}}
//将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出String info = name+"-"+name2+"-"+phone+"-"+phone2+"-"+sum;System.out.println(rowkey);System.out.println(info);// 01_手机号_yyyyMMddhhmmss_1String[] split = rowkey.split("_");// 截取电话号码String phoneNum = split[1];// 拼接keyString dataCallKe = phoneNum+"_"+split[2].substring(0,6);// 拼接valueString keys = phoneNum+dataCallKe;//输出到文件context.write(new Text(keys), new Text(info));}
}
第二步、建立Reduce端代码
package phoneXM;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer {private UserInfo userInfo = new UserInfo();// private UserInfoDBWritable userInfoDBWritable = null;@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {//获取手机号// String phone = key.toString().split("_")[1];//拼装信息Integer longTime = 0;for(Text text:values){String time = text.toString().split("-")[4];longTime += Integer.parseInt(time);}Text tt = new Text(longTime+"");System.out.println(key.toString());String phone = key.toString().split("_")[0];String month = key.toString().split("_")[1];// id,userInfo.setPhone(phone);// accountuserInfo.setMonth(month);// nameuserInfo.setSumTime(longTime+"");// 写入到db,放在key// userInfoDBWritable = new UserInfoDBWritable(userInfo);context.write(userInfo , null);//context.write(key,tt);}
}
第三步、Driver端代码
package phoneXM;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import phoneXM.PhoneMapper;
import phoneXM.PhoneReducer;import java.io.FileOutputStream;
import java.io.IOException;//将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。
public class Driver extends Configured implements Tool {public static void main(String[] args) throws Exception{Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum","es1,es2,es3");configuration.set("hbase.zookeeper.property.clientport","2181");int re = ToolRunner.run(configuration,new Driver(),args);System.exit(re);}public int run(String[] args) throws Exception {// 得到ConfConfiguration configuration = this.getConf();//数据库配置DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.244.162:3306/phone","root", "123456");Job job = Job.getInstance(configuration, "db info1");// 创建job任务
// Job job = Job.getInstance(configuration,this.getClass().getSimpleName());job.setJarByClass(Driver.class);// 配置jobScan scan = new Scan();scan.setCacheBlocks(false);scan.setCaching(500);// 设置MapperTableMapReduceUtil.initTableMapperJob("dianxin:phone", // 数据源的表名scan, // scan扫描控制器PhoneMapper.class, // 设置Mapper类Text.class, // 设置Mapper输入key类型Text.class, // 设置Mapper输出value值类型job // 设置job);// 设置Reduce/*TableMapReduceUtil.initTableReducerJob("hbase_mr", // 表名Test_reduce.class, // 设置reducejob);*/// 设置reduce数量,最少一个job.setNumReduceTasks(1);job.setReducerClass(PhoneReducer.class);job.setOutputKeyClass(UserInfo.class);job.setOutputValueClass(NullWritable.class);//FileOutputFormat.setOutputPath(job, new Path("D:\\Demo\\hadoop\\ouput\\out1"));DBOutputFormat.setOutput(job, "info1", "phone", "month", "sumTime");job.setOutputFormatClass(DBOutputFormat.class);boolean isSuccess = job.waitForCompletion(true);if(!isSuccess){throw new IOException("Job running with error");}return isSuccess ? 0 : 1;}}
工具类、UserInfo
package phoneXM;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;public class UserInfo implements DBWritable {//主要是把手机号,月份,通话总时间放入到mysql,所以把这3个封装一个类private String phone;private String month;private String sumTime;public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public String getMonth() {return month;}public void setMonth(String month) {this.month = month;}public String getSumTime() {return sumTime;}public void setSumTime(String sumTime) {this.sumTime = sumTime;}public void write(PreparedStatement statement) throws SQLException {statement.setString(1,this.getPhone());statement.setString(2,this.getMonth());statement.setString(3,this.getSumTime());}public void readFields(ResultSet resultSet) throws SQLException {}
}
测试
集群Hbase开启,运行代码,查看数据库表信息
到此已经完了,大家可以去测试一下