热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sqoop导入数据地图-reducejob分析

sqoop导入数据map-reducejob分析Sqoop导入数据1、本质上sqoop是一个hadoop的一个jobClient,负责定义hadoopjob,然后将job提交到hadoop集群
sqoop导入数据map-reduce job分析
Sqoop 导入数据
1、 本质上sqoop是一个hadoop的一个jobClient,负责定义hadoop job,然后将job提交到hadoop集群,只不过这个jobClient为了支持了能通过命令行来配置各种各样的job,做了很多处理。
2、 sqoop实现了各种关系型数据库(oracle,DB2,Mysql)等等<->到hadoop(hafs,hbase)的导入导出。Sqoop的导入导出,通过无reduce的map reduce job实现导入导出,每一个map导入导出其中的某一块数据。
为了实现关系型数据库到hadoop job的导入导出,关键是要写好,map-reduce job。
下面介绍下一般的mapreduce job的执行过程。
众所周知,在执行一个Job的时候,Hadoop会将输入的数据划分为,N个Split,然后启动相应的N个Map程序分别来处理它们。
1、数据如何划分?
2、Split如何调度?
3、划分后的数据又如何读取?
这是任何一个map-reduce job来解决的问题。
下图是一个

过一下,运行mapred job的流程
1、 jobclient启动在单独一个jvm中,通过Job类配置的配置,定义InputFomat,outputformat,map,reduce等等一系列配置。
2、 通过RPC调用,像jobTracke申请一个独一无二的JobID来标示这个Job。
3、 JobCilent将Job所需要的资源提交到HDFS中也以JobID命名的目录中。这些资源包括JAR包配置文件、InputSplit等。
4、 JobClient向JobTracker提交Job。
5、 JobTracker初始化
6、 JobTracker从hadfs获取这个Job的split等信息。
7、 JobTracker向TaskTracer分配任务
8、 TaskTracker从HDFS获取这个Job的相关资源。
9、 TaskTracker开启一个新的JVM
10、 TaskTracker用新的JVM来执行Map或Reduce。

首先是“数据如何划分”的问题。
在第3步中,JobClient向HDFS提交的资源就包含了InputSplit,这就是数据划分的结果。也就是说,数据划分是在JobClient上完成的。在这里,JobClient会使用指定的InputFormat将输入数据做一次划分,形成若干个Split
InputFormat是一个interface。用户在启动MapReduce的时候需要指定一个InputFormat的implement。InputFormat只包含了两个接口方法:
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;

getSplits就是现在要使用的划分函数。job参数是任务的配置集合,从中可以取到用户在启动MapReduce时指定的输入文件路径。而numSplits参数是一个Split数目的建议值,是否考虑这个值,由具体的InputFormat实现来决定。
返回的是InputSplit数组,它描述了所有的Split信息,一个InputSplit描述一个Split(分块)。

InputSplit也是一个interface,具体返回什么样的implement,这是由具体的InputFormat来决定的。InputSplit也只有两个接口函数:
long getLength() throws IOException;
String[] getLocations() throws IOException;

这 个interface仅仅描述了Split有多长,以及存放这个Split的Location信息(也就是这个Split在HDFS上存放的机器。它可能 有多个replication,存在于多台机器上)。除此之外,就再没有任何直接描述Split的信息了。比如:Split对应于哪个文件?在文件中的起 始和结束位置是什么?等等重要的特征都没有描述到。
为什么会这样呢?因为关于Split的那些描述信息,对于MapReduce框架来说是不需要关心的。框架只关心Split的长度(主要用于一些统计信息)和Split的Location(主要用于Split的调度,后面会细说)。
而 Split中真正重要的描述信息还是只有InputFormat会关心。在需要读取一个Split的时候,其对应的InputSplit会被传递到 InputFormat的第二个接口函数getRecordReader,然后被用于初始化一个RecordReader,以解析输入数据。也就是说,描 述Split的重要信息都被隐藏了,只有具体的InputFormat自己知道。它只需要保证getSplits返回的InputSplit和 getRecordReader所关心的InputSplit是同样的implement就行了。这就给InputFormat的实现提供了巨大的灵活性。
我们通过sqoop import job来展开说起,
SQOOP import features
1、Divide table into ranges using primary key max/min
2、Create mappers for each range 
3、Mappers write to multiple HDFS nodes
4、Creates text or sequence files 
5、Generates Java class for resulting HDFS file
6、Generates Hive definition and auto-loads into HIVE
对于我们sqoop InputFomat 我们从一个sqoop的一个具体的InputFormat说起,
继承结构
DataDrivenDBInputFormat extends DBInputFormat extends InputFormat
作用是从关系型数据库中导入数据到HDFS上,
基本思想:
关系型数据库是面向行的数据库,查询出来的无非是关系的集合,而我们导出数据,就需要,需要分布式地导出,需要把导出记录的集合分块,如何平均地把数据库分块信息,能否平均地分配到各个map当中是sqoop的关键。

DataDrivenDBInputFormat
首先看它的函数:
getsplits()
public List getSplits(JobContext job) throws IOException {

int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);
if (1 == targetNumTasks) {//如果是一个map则把整个查询作为一块
// There's no need to run a bounding vals query; just return a split
// that separates nothing. This can be considerably more optimal for a
// large table with no index.
List singletOnSplit= new ArrayList();
singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
return singletonSplit;
}

//得到数据库链接等信息
ResultSet results = null;
Statement statement = null;
Connection cOnnection= getConnection();
try {
statement = connection.createStatement();

String query = getBoundingValsQuery();//得到整体分块的上下界
LOG.info("BoundingValsQuery: " + query);

results = statement.executeQuery(query);
results.next();

// Based on the type of the results, use a different mechanism
// for interpolating split points (i.e., numeric splits, text splits,
// dates, etc.)
int sqlDataType = results.getMetaData().getColumnType(1);//得到分块的基准列的类型
DBSplitter splitter = getSplitter(sqlDataType);//根据基准列类型得到分片器
if (null == splitter) {
throw new IOException("Unknown SQL data type: " + sqlDataType);
}

return splitter.split(job.getConfiguration(), results,
getDBConf().getInputOrderBy());//
} catch (SQLException e) {
throw new IOException(e);
} finally {
// More-or-less ignore SQL exceptions here, but log in case we need it.
try {
if (null != results) {
results.close();
}
} catch (SQLException se) {
LOG.debug("SQLException closing resultset: " + se.toString());
}

try {
if (null != statement) {
statement.close();
}
} catch (SQLException se) {
LOG.debug("SQLException closing statement: " + se.toString());
}

try {
connection.commit();
closeConnection();
} catch (SQLException se) {
LOG.debug("SQLException committing split transaction: "
+ se.toString());
}
}
}

具体看spliter:
public List split(Configuration conf, ResultSet results,
String colName) throws SQLException {

long minVal = results.getLong(1);//得到最大值
long maxVal = results.getLong(2);

String lowClausePrefix = colName + " >= ";
String highClausePrefix = colName + " <";

int numSplits = ConfigurationHelper.getConfNumMaps(conf);//得到分片总数
if (numSplits <1) {
numSplits = 1;
}

if (results.getString(1) == null && results.getString(2) == null){
// Range is null to null. Return a null split accordingly.
List splits = new ArrayList();
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
return splits;
}

// 得到所有的分片点
List splitPoints = split(numSplits, minVal, maxVal);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Splits: [%,28d to %,28d] into %d parts",
minVal, maxVal, numSplits));
for (int i = 0; i LOG.debug(String.format("%,28d", splitPoints.get(i)));
}
}
List splits = new ArrayList();

// Turn the split points into a set of intervals.
long start = splitPoints.get(0);
//具体处理每一个split
for (int i = 1; i long end = splitPoints.get(i);

if (i == splitPoints.size() - 1) {
// This is the last one; use a closed interval.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + Long.toString(start),
colName + " <= " + Long.toString(end)));
} else {
// Normal open-interval case.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + Long.toString(start),
highClausePrefix + Long.toString(end)));
}

start = end;
}

if (results.getString(1) == null || results.getString(2) == null) {
// At least one extrema is null; add a null split.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
}

return splits;
}

再看看分片的结构:
public static class DataDrivenDBInputSplit
extends DBInputFormat.DBInputSplit {

private String lowerBoundClause;
private String upperBoundClause;

在这里只列出来两个域lowerBoundClause,upperBoundClause,每一个分片由上界where字句和下界where字句限制。


下面再看另一个重要的方法:createDBRecordReader
protected RecordReader createDBRecordReader(
DBInputSplit split, Configuration conf) throws IOException {

DBConfiguration dbCOnf= getDBConf();
@SuppressWarnings("unchecked")
Class inputClass = (Class) (dbConf.getInputClass());
String dbProductName = getDBProductName();

LOG.debug("Creating db record reader for db product: " + dbProductName);

try {
return new DataDrivenDBRecordReader(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
dbProductName);
} catch (SQLException ex) {
throw new IOException(ex);
}
}

最后返回一个具体DataDrivenDBRecordReader实现RecordReader类,该DataDrivenDBRecordReader,把一个long类型的变量作为key,把一条行记录作为一个Value作为Value,传入map函数进行处理。
这里面有一个小的trick,怎样把关系型数据库的一条记录,映射为java的一个对象呢,由于具体查询哪条数据库哪个字段是不确定的,所以我们需要根据查询的字段动态生成一个一条记录的对应的java对象,即继承SqoopRecord的java对象,
SqoopRecord的类如下所示:
public abstract class SqoopRecord implements Cloneable, DBWritable,
FieldMappable, Writable {

public SqoopRecord() {
}

//parse方法的作用是从特定的格式中转化为java对象
public abstract void parse(CharSequence s) throws RecordParser.ParseError;
public abstract void parse(Text s) throws RecordParser.ParseError;
public abstract void parse(byte [] s) throws RecordParser.ParseError;
public abstract void parse(char [] s) throws RecordParser.ParseError;
public abstract void parse(ByteBuffer s) throws RecordParser.ParseError;
public abstract void parse(CharBuffer s) throws RecordParser.ParseError;
public abstract void loadLargeObjects(LargeObjectLoader objLoader)
throws SQLException, IOException, InterruptedException;

/**
* Inserts the data in this object into the PreparedStatement, starting
* at parameter 'offset'.
* @return the number of fields written to the statement.
*/
public abstract int write(PreparedStatement stmt, int offset)
throws SQLException;

/**
* Format output data according to the specified delimiters.
*/
public abstract String toString(DelimiterSet delimiters);

/**
* Use the default delimiters, but only append an end-of-record delimiter
* if useRecordDelim is true.
*/
public String toString(boolean useRecordDelim) {
// Method body should be overridden by generated classes in 1.3.0+
if (useRecordDelim) {
// This is the existing functionality.
return toString();
} else {
// Setting this to false requires behavior in the generated class.
throw new RuntimeException(
"toString(useRecordDelim=false) requires a newer SqoopRecord. "
+ "Please regenerate your record class to use this function.");
}
}

/**
* Format the record according to the specified delimiters. An end-of-record
* delimiter is optional, and only used if useRecordDelim is true. For
* use with TextOutputFormat, calling this with useRecordDelim=false may
* make more sense.
*/
public String toString(DelimiterSet delimiters, boolean useRecordDelim) {
if (useRecordDelim) {
return toString(delimiters);
} else {
// Setting this to false requires behavior in the generated class.
throw new RuntimeException(
"toString(delimiters, useRecordDelim=false) requires a newer "
+ "SqoopRecord. Please regenerate your record class to use this "
+ "function.");
}
}

@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}

/**
* Returns an integer specifying which API format version the
* generated class conforms to. Used by internal APIs for backwards
* compatibility.
* @return the API version this class was generated against.
*/
public abstract int getClassFormatVersion();

/**
* Use the delegate pattern to allow arbitrary processing of the
* fields of this record.
* @param processor A delegate that operates on this object.
* @throws IOException if the processor encounters an IO error when
* operating on this object.
* @throws ProcessingException if the FieldMapProcessor encounters
* a general processing error when operating on this object.
*/
public void delegate(FieldMapProcessor processor)
throws IOException, ProcessingException {
processor.accept(this);
}

@Override
/**
* {@inheriDoc}
* @throws RuntimeException if used with a record that was generated
* before this capability was added (1.1.0).
*/
public Map getFieldMap() {
// Default implementation does not support field iteration.
// ClassWriter should provide an overriding version.
throw new RuntimeException(
"Got null field map from record. Regenerate your record class.");
}

/**
* Allows an arbitrary field to be set programmatically to the
* specified value object. The value object must match the
* type expected for the particular field or a RuntimeException
* will result.
* @throws RuntimeException if the specified field name does not exist.
*/
public void setField(String fieldName, Object fieldVal) {
throw new RuntimeException("This SqoopRecord does not support setField(). "
+ "Regenerate your record class.");
}
}

sqoop为我们生成继承SqoopRecord的类的source code,然后再编译打包,最后放入classpath中,然后指定该类为inputclass
至此,我们的map-reducejob算是弄完,具体的map方法很简单,需要需要从sqoopRecord把值取出来,然后写入hdfs即可。
推荐阅读
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • 在分析Android的Audio系统时,我们对mpAudioPolicy->get_input进行了详细探讨,发现其背后涉及的机制相当复杂。本文将详细介绍这一过程及其背后的实现细节。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 优化后的标题:校园互联新方案:10397连接教育未来 ... [详细]
  • 本文整理了Java中org.apache.hadoop.mapreduce.lib.input.MultipleInputs.addInputPath()方法的一些代码 ... [详细]
  • MySQL初级篇——字符串、日期时间、流程控制函数的相关应用
    文章目录:1.字符串函数2.日期时间函数2.1获取日期时间2.2日期与时间戳的转换2.3获取年月日、时分秒、星期数、天数等函数2.4时间和秒钟的转换2. ... [详细]
  • 本文节选自《NLTK基础教程——用NLTK和Python库构建机器学习应用》一书的第1章第1.2节,作者Nitin Hardeniya。本文将带领读者快速了解Python的基础知识,为后续的机器学习应用打下坚实的基础。 ... [详细]
  • MicrosoftDeploymentToolkit2010部署培训实验手册V1.0目录实验环境说明3实验环境虚拟机使用信息3注意:4实验手册正文说 ... [详细]
  • 本文详细介绍了在MySQL中如何高效利用EXPLAIN命令进行查询优化。通过实例解析和步骤说明,文章旨在帮助读者深入理解EXPLAIN命令的工作原理及其在性能调优中的应用,内容通俗易懂且结构清晰,适合各水平的数据库管理员和技术人员参考学习。 ... [详细]
  • 针对图像分类任务的训练方案进行了优化设计。通过引入PyTorch等深度学习框架,利用其丰富的工具包和模块,如 `torch.nn` 和 `torch.nn.functional`,提升了模型的训练效率和分类准确性。优化方案包括数据预处理、模型架构选择和损失函数的设计等方面,旨在提高图像分类任务的整体性能。 ... [详细]
  • 本文总结了JavaScript的核心知识点和实用技巧,涵盖了变量声明、DOM操作、事件处理等重要方面。例如,通过`event.srcElement`获取触发事件的元素,并使用`alert`显示其HTML结构;利用`innerText`和`innerHTML`属性分别设置和获取文本内容及HTML内容。此外,还介绍了如何在表单中动态生成和操作``元素,以便更好地处理用户输入。这些技巧对于提升前端开发效率和代码质量具有重要意义。 ... [详细]
  • 如何使用mysql_nd:Python连接MySQL数据库的优雅指南
    无论是进行机器学习、Web开发还是爬虫项目,数据库操作都是必不可少的一环。本文将详细介绍如何使用Python通过 `mysql_nd` 库与 MySQL 数据库进行高效连接和数据交互。内容涵盖以下几个方面: ... [详细]
  • 在Oracle数据库中,若需更新特定列的数据,可以通过联接两张表来实现。例如,假设我们有两张表:`sales` 和 `goods`。为了更新 `sales` 表中的某些列,可以使用 `UPDATE` 语句结合 `JOIN` 操作,确保数据的准确性和一致性。具体操作步骤包括选择需要更新的目标列,定义联接条件,并指定更新后的值。这种方法不仅提高了数据处理的效率,还保证了数据的完整性。 ... [详细]
author-avatar
mobiledu2502855907
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有