热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MR读Hbase数据,写入到Mysql(HBaseMysql)

首先看一下Hbase的数据,本系统将Hbase放入mysql首先看一下hbase表结构需求:根据用户在hbase的通话记录,求出每个用户每

首先看一下Hbase的数据 ,本系统将Hbase放入mysql
首先看一下hbase表结构
'dianxin:phone', {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => ' true1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}1 row(s) in 9.9800 seconds

在这里插入图片描述


需求:根据用户在hbase的通话记录,求出每个用户每个月总共通话时间,放入mysql中


第一步、建立mapper端

package phoneXM;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;import java.io.IOException;public class PhoneMapper extends TableMapper {@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//将fruit的name和color提取出来,相当于将每一行数据提取出来放入put中Put put = new Put(key.get());
// Get get = new Get();//遍历行String rowkey = new String(key.get());String name = "";String phone = "";String name2 = "";String phone2 = "";String time = "";String sum = "";for (Cell cell :value.rawCells()) {if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {/// 添 加 克 隆 列 :nameif ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//将该列 cell 加入到 put 对象中name = Bytes.toString(CellUtil.cloneValue(cell));} else if ("phone".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中phone = Bytes.toString(CellUtil.cloneValue(cell));}else if ("nameTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中name2 = Bytes.toString(CellUtil.cloneValue(cell));}else if ("phoneTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中phone2 = Bytes.toString(CellUtil.cloneValue(cell));}else if ("time".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中time = Bytes.toString(CellUtil.cloneValue(cell));}else if ("sum".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中sum = Bytes.toString(CellUtil.cloneValue(cell));}}}
//将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出String info = name+"-"+name2+"-"+phone+"-"+phone2+"-"+sum;System.out.println(rowkey);System.out.println(info);// 01_手机号_yyyyMMddhhmmss_1String[] split = rowkey.split("_");// 截取电话号码String phoneNum = split[1];// 拼接keyString dataCallKe = phoneNum+"_"+split[2].substring(0,6);// 拼接valueString keys = phoneNum+dataCallKe;//输出到文件context.write(new Text(keys), new Text(info));}
}

第二步、建立Reduce端代码

package phoneXM;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer {private UserInfo userInfo = new UserInfo();// private UserInfoDBWritable userInfoDBWritable = null;@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {//获取手机号// String phone = key.toString().split("_")[1];//拼装信息Integer longTime = 0;for(Text text:values){String time = text.toString().split("-")[4];longTime += Integer.parseInt(time);}Text tt = new Text(longTime+"");System.out.println(key.toString());String phone = key.toString().split("_")[0];String month = key.toString().split("_")[1];// id,userInfo.setPhone(phone);// accountuserInfo.setMonth(month);// nameuserInfo.setSumTime(longTime+"");// 写入到db,放在key// userInfoDBWritable = new UserInfoDBWritable(userInfo);context.write(userInfo , null);//context.write(key,tt);}
}

第三步、Driver端代码

package phoneXM;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import phoneXM.PhoneMapper;
import phoneXM.PhoneReducer;import java.io.FileOutputStream;
import java.io.IOException;//将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。
public class Driver extends Configured implements Tool {public static void main(String[] args) throws Exception{Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum","es1,es2,es3");configuration.set("hbase.zookeeper.property.clientport","2181");int re = ToolRunner.run(configuration,new Driver(),args);System.exit(re);}public int run(String[] args) throws Exception {// 得到ConfConfiguration configuration = this.getConf();//数据库配置DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.244.162:3306/phone","root", "123456");Job job = Job.getInstance(configuration, "db info1");// 创建job任务
// Job job = Job.getInstance(configuration,this.getClass().getSimpleName());job.setJarByClass(Driver.class);// 配置jobScan scan = new Scan();scan.setCacheBlocks(false);scan.setCaching(500);// 设置MapperTableMapReduceUtil.initTableMapperJob("dianxin:phone", // 数据源的表名scan, // scan扫描控制器PhoneMapper.class, // 设置Mapper类Text.class, // 设置Mapper输入key类型Text.class, // 设置Mapper输出value值类型job // 设置job);// 设置Reduce/*TableMapReduceUtil.initTableReducerJob("hbase_mr", // 表名Test_reduce.class, // 设置reducejob);*/// 设置reduce数量,最少一个job.setNumReduceTasks(1);job.setReducerClass(PhoneReducer.class);job.setOutputKeyClass(UserInfo.class);job.setOutputValueClass(NullWritable.class);//FileOutputFormat.setOutputPath(job, new Path("D:\\Demo\\hadoop\\ouput\\out1"));DBOutputFormat.setOutput(job, "info1", "phone", "month", "sumTime");job.setOutputFormatClass(DBOutputFormat.class);boolean isSuccess = job.waitForCompletion(true);if(!isSuccess){throw new IOException("Job running with error");}return isSuccess ? 0 : 1;}}

工具类、UserInfo

package phoneXM;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;public class UserInfo implements DBWritable {//主要是把手机号,月份,通话总时间放入到mysql,所以把这3个封装一个类private String phone;private String month;private String sumTime;public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public String getMonth() {return month;}public void setMonth(String month) {this.month = month;}public String getSumTime() {return sumTime;}public void setSumTime(String sumTime) {this.sumTime = sumTime;}public void write(PreparedStatement statement) throws SQLException {statement.setString(1,this.getPhone());statement.setString(2,this.getMonth());statement.setString(3,this.getSumTime());}public void readFields(ResultSet resultSet) throws SQLException {}
}

测试

集群Hbase开启,运行代码,查看数据库表信息
在这里插入图片描述
到此已经完了,大家可以去测试一下


推荐阅读
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 本文介绍了如何在 Spring Boot 项目中使用 spring-boot-starter-quartz 组件实现定时任务,并将 cron 表达式存储在数据库中,以便动态调整任务执行频率。 ... [详细]
  • Cookie学习小结
    Cookie学习小结 ... [详细]
  • 本文将介绍如何在混合开发(Hybrid)应用中实现Native与HTML5的交互,包括基本概念、学习目标以及具体的实现步骤。 ... [详细]
  • 字节流(InputStream和OutputStream),字节流读写文件,字节流的缓冲区,字节缓冲流
    字节流抽象类InputStream和OutputStream是字节流的顶级父类所有的字节输入流都继承自InputStream,所有的输出流都继承子OutputStreamInput ... [详细]
  • 如何使用 `org.eclipse.rdf4j.query.impl.MapBindingSet.getValue()` 方法及其代码示例详解 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 优化后的标题:深入探讨网关安全:将微服务升级为OAuth2资源服务器的最佳实践
    本文深入探讨了如何将微服务升级为OAuth2资源服务器,以订单服务为例,详细介绍了在POM文件中添加 `spring-cloud-starter-oauth2` 依赖,并配置Spring Security以实现对微服务的保护。通过这一过程,不仅增强了系统的安全性,还提高了资源访问的可控性和灵活性。文章还讨论了最佳实践,包括如何配置OAuth2客户端和资源服务器,以及如何处理常见的安全问题和错误。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 本文详细介绍了Java反射机制的基本概念、获取Class对象的方法、反射的主要功能及其在实际开发中的应用。通过具体示例,帮助读者更好地理解和使用Java反射。 ... [详细]
  • Unity与MySQL连接过程中出现的新挑战及解决方案探析 ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
author-avatar
nora抹抹茶I
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有