热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MR读Hbase数据,写入到Mysql(HBaseMysql)

首先看一下Hbase的数据,本系统将Hbase放入mysql首先看一下hbase表结构需求:根据用户在hbase的通话记录,求出每个用户每

首先看一下Hbase的数据 ,本系统将Hbase放入mysql
首先看一下hbase表结构
'dianxin:phone', {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => ' true1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}1 row(s) in 9.9800 seconds

在这里插入图片描述


需求:根据用户在hbase的通话记录,求出每个用户每个月总共通话时间,放入mysql中


第一步、建立mapper端

package phoneXM;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;import java.io.IOException;public class PhoneMapper extends TableMapper {@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//将fruit的name和color提取出来,相当于将每一行数据提取出来放入put中Put put = new Put(key.get());
// Get get = new Get();//遍历行String rowkey = new String(key.get());String name = "";String phone = "";String name2 = "";String phone2 = "";String time = "";String sum = "";for (Cell cell :value.rawCells()) {if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {/// 添 加 克 隆 列 :nameif ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//将该列 cell 加入到 put 对象中name = Bytes.toString(CellUtil.cloneValue(cell));} else if ("phone".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中phone = Bytes.toString(CellUtil.cloneValue(cell));}else if ("nameTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中name2 = Bytes.toString(CellUtil.cloneValue(cell));}else if ("phoneTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中phone2 = Bytes.toString(CellUtil.cloneValue(cell));}else if ("time".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中time = Bytes.toString(CellUtil.cloneValue(cell));}else if ("sum".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {//向该列 cell 加入到 put 对象中sum = Bytes.toString(CellUtil.cloneValue(cell));}}}
//将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出String info = name+"-"+name2+"-"+phone+"-"+phone2+"-"+sum;System.out.println(rowkey);System.out.println(info);// 01_手机号_yyyyMMddhhmmss_1String[] split = rowkey.split("_");// 截取电话号码String phoneNum = split[1];// 拼接keyString dataCallKe = phoneNum+"_"+split[2].substring(0,6);// 拼接valueString keys = phoneNum+dataCallKe;//输出到文件context.write(new Text(keys), new Text(info));}
}

第二步、建立Reduce端代码

package phoneXM;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer {private UserInfo userInfo = new UserInfo();// private UserInfoDBWritable userInfoDBWritable = null;@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {//获取手机号// String phone = key.toString().split("_")[1];//拼装信息Integer longTime = 0;for(Text text:values){String time = text.toString().split("-")[4];longTime += Integer.parseInt(time);}Text tt = new Text(longTime+"");System.out.println(key.toString());String phone = key.toString().split("_")[0];String month = key.toString().split("_")[1];// id,userInfo.setPhone(phone);// accountuserInfo.setMonth(month);// nameuserInfo.setSumTime(longTime+"");// 写入到db,放在key// userInfoDBWritable = new UserInfoDBWritable(userInfo);context.write(userInfo , null);//context.write(key,tt);}
}

第三步、Driver端代码

package phoneXM;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import phoneXM.PhoneMapper;
import phoneXM.PhoneReducer;import java.io.FileOutputStream;
import java.io.IOException;//将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。
public class Driver extends Configured implements Tool {public static void main(String[] args) throws Exception{Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum","es1,es2,es3");configuration.set("hbase.zookeeper.property.clientport","2181");int re = ToolRunner.run(configuration,new Driver(),args);System.exit(re);}public int run(String[] args) throws Exception {// 得到ConfConfiguration configuration = this.getConf();//数据库配置DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.244.162:3306/phone","root", "123456");Job job = Job.getInstance(configuration, "db info1");// 创建job任务
// Job job = Job.getInstance(configuration,this.getClass().getSimpleName());job.setJarByClass(Driver.class);// 配置jobScan scan = new Scan();scan.setCacheBlocks(false);scan.setCaching(500);// 设置MapperTableMapReduceUtil.initTableMapperJob("dianxin:phone", // 数据源的表名scan, // scan扫描控制器PhoneMapper.class, // 设置Mapper类Text.class, // 设置Mapper输入key类型Text.class, // 设置Mapper输出value值类型job // 设置job);// 设置Reduce/*TableMapReduceUtil.initTableReducerJob("hbase_mr", // 表名Test_reduce.class, // 设置reducejob);*/// 设置reduce数量,最少一个job.setNumReduceTasks(1);job.setReducerClass(PhoneReducer.class);job.setOutputKeyClass(UserInfo.class);job.setOutputValueClass(NullWritable.class);//FileOutputFormat.setOutputPath(job, new Path("D:\\Demo\\hadoop\\ouput\\out1"));DBOutputFormat.setOutput(job, "info1", "phone", "month", "sumTime");job.setOutputFormatClass(DBOutputFormat.class);boolean isSuccess = job.waitForCompletion(true);if(!isSuccess){throw new IOException("Job running with error");}return isSuccess ? 0 : 1;}}

工具类、UserInfo

package phoneXM;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;public class UserInfo implements DBWritable {//主要是把手机号,月份,通话总时间放入到mysql,所以把这3个封装一个类private String phone;private String month;private String sumTime;public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public String getMonth() {return month;}public void setMonth(String month) {this.month = month;}public String getSumTime() {return sumTime;}public void setSumTime(String sumTime) {this.sumTime = sumTime;}public void write(PreparedStatement statement) throws SQLException {statement.setString(1,this.getPhone());statement.setString(2,this.getMonth());statement.setString(3,this.getSumTime());}public void readFields(ResultSet resultSet) throws SQLException {}
}

测试

集群Hbase开启,运行代码,查看数据库表信息
在这里插入图片描述
到此已经完了,大家可以去测试一下


推荐阅读
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 标题: ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 合并列值-合并为一列问题需求:createtabletab(Aint,Bint,Cint)inserttabselect1,2,3unionallsel ... [详细]
author-avatar
nora抹抹茶I
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有