首页
技术博客
PHP教程
数据库技术
前端开发
HTML5
Nginx
php论坛
新用户注册
|
会员登录
PHP教程
技术博客
编程问答
PNG素材
编程语言
前端技术
Android
PHP教程
HTML5教程
数据库
Linux技术
Nginx技术
PHP安全
WebSerer
职场攻略
JavaScript
开放平台
业界资讯
大话程序猿
登录
极速注册
取消
热门标签 | HotTags
testing
io
substring
timestamp
join
typescript
subset
text
settings
regex
golang
uri
frameworks
config
python3
ip
bytecode
java
node.js
header
solr
metadata
uml
input
install
sum
perl
command
keyword
client
format
heatmap
schema
include
jar
audio
const
byte
require
main
future
object
hashset
fetch
char
web
nodejs
blob
md5
function
search
loops
vba
window
process
python
hash
utf-8
chat
httprequest
go
js
heap
stream
callback
cpython
post
list
cmd
dll
cPlusPlus
plugins
request
triggers
ascii
rsa
cSharp
match
iostream
当前位置:
开发笔记
>
编程语言
> 正文
分拆TableSplit让多个mapper同时读取
作者:特贰的大妞 | 来源:互联网 | 2023-09-16 19:11
分拆TableSplit让多个mapper同时读取默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个tables
分拆TableSplit 让多个mapper同时读取
默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个table split分拆成多个split,这样hadoop就能通过多个mapper读取。
由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取
Java代码
mapred.min.split.size
mapred.max.split.size
所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。
HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据:
Java代码
TableMapReduceUtil.initTableMapperJob(table[
0
].getBytes(), scan,
UserViewHisMapper2.
class
, Text.
class
, Text.
class
,
genRecommendations);
而这个方法,最终是调用以下方法进行初始化设置的:
Java代码
public
static
void
initTableMapperJob(
byte
[] table, Scan scan,
Class
extends
TableMapper> mapper,
Class
extends
WritableComparable> outputKeyClass,
Class
extends
Writable> outputValueClass, Job job,
boolean
addDependencyJars)
throws
IOException {
initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
outputValueClass, job, addDependencyJars, TableInputFormat.
class
);
}
所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase:
Java代码
public
class
TableInputFormat
extends
TableInputFormatBase
implements
Configurable
最终要修改的则是TableInputFormatBase这个类,修改其以下方法:
Java代码
public
List
getSplits(JobContext context)
throws
IOException {}
这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit:
Java代码
public
List
getSplits(JobContext context)
throws
IOException {
f (table ==
null
) {
throw
new
IOException(
"No table was provided."
);
Pair<
byte
[][],
byte
[][]> keys = table.getStartEndKeys();
if
(keys ==
null
|| keys.getFirst() ==
null
||
keys.getFirst().length ==
0
) {
throw
new
IOException(
"Expecting at least one region."
);
}
int
count =
0
;
List
splits =
new
ArrayList
(keys.getFirst().length);
for
(
int
i =
0
; i < keys.getFirst().length; i++) {
if
( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue
;
}
String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
getHostname();
byte
[] startRow = scan.getStartRow();
byte
[] stopRow = scan.getStopRow();
// determine if the given start an stop key fall into the region
if
((startRow.length ==
0
|| keys.getSecond()[i].length ==
0
||
Bytes.compareTo(startRow, keys.getSecond()[i]) <
0
) &&
(stopRow.length ==
0
||
Bytes.compareTo(stopRow, keys.getFirst()[i]) >
0
)) {
byte
[] splitStart = startRow.length ==
0
||
Bytes.compareTo(keys.getFirst()[i], startRow) >=
0
?
keys.getFirst()[i] : startRow;
byte
[] splitStop = (stopRow.length ==
0
||
Bytes.compareTo(keys.getSecond()[i], stopRow) <=
0
) &&
keys.getSecond()[i].length >
0
?
keys.getSecond()[i] : stopRow;
InputSplit split =
new
TableSplit(table.getTableName(),
splitStart, splitStop, regionLocation);
splits.add(split);
if
(LOG.isDebugEnabled())
LOG.debug(
"getSplits: split -> "
+ (count++) +
" -> "
+ split);
}
}
return
splits;
}
这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。
以下是我的实现方法:
Java代码
public
List
getSplits(JobContext context)
throws
IOException {
if
(table ==
null
) {
throw
new
IOException(
"No table was provided."
);
}
Pair<
byte
[][],
byte
[][]> keys = table.getStartEndKeys();
if
(keys ==
null
|| keys.getFirst() ==
null
|| keys.getFirst().length ==
0
) {
throw
new
IOException(
"Expecting at least one region."
);
}
int
count =
0
;
List
splits =
new
ArrayList
(
keys.getFirst().length);
for
(
int
i =
0
; i < keys.getFirst().length; i++) {
if
(!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue
;
}
String regionLocation = table.getRegionLocation(keys.getFirst()[i],
true
)
.getHostname();
byte
[] startRow = scan.getStartRow();
byte
[] stopRow = scan.getStopRow();
// determine if the given start an stop key fall into the region
if
((startRow.length ==
0
|| keys.getSecond()[i].length ==
0
|| Bytes
.compareTo(startRow, keys.getSecond()[i]) <
0
)
&& (stopRow.length ==
0
|| Bytes.compareTo(stopRow,
keys.getFirst()[i]) >
0
)) {
byte
[] splitStart = startRow.length ==
0
|| Bytes.compareTo(keys.getFirst()[i], startRow) >=
0
? keys
.getFirst()[i] : startRow;
byte
[] splitStop = (stopRow.length ==
0
|| Bytes.compareTo(
keys.getSecond()[i], stopRow) <=
0
)
&& keys.getSecond()[i].length >
0
? keys.getSecond()[i]
: stopRow;
Scan scan1 =
new
Scan();
scan1.setStartRow(splitStart);
scan1.setStopRow(splitStop);
scan1.setFilter(
new
KeyOnlyFilter());
scan1.setBatch(
500
);
ResultScanner resultscanner = table.getScanner(scan1);
//用来保存该region的所有key
List
rows =
new
ArrayList
();
//Iterator
it = resultscanner.iterator();
for
(Result rs : resultscanner)
{
if
(rs.isEmpty())
continue
;
rows.add(
new
String(rs.getRow()));
}
int
splitSize = rows.size() / mappersPerSplit;
for
(
int
j =
0
; j < mappersPerSplit; j++) {
TableSplit tablesplit =
null
;
if
(j == mappersPerSplit -
1
)
tablesplit =
new
TableSplit(table.getTableName(),
rows.get(j * splitSize).getBytes(),
rows.get(rows.size() -
1
).getBytes(),
regionLocation);
else
tablesplit =
new
TableSplit(table.getTableName(),
rows.get(j * splitSize).getBytes(),
rows.get(j * splitSize + splitSize).getBytes(), regionLocation);
splits.add(tablesplit);
if
(LOG.isDebugEnabled())
LOG.debug((
new
StringBuilder())
.append(
"getSplits: split -> "
).append(i++)
.append(
" -> "
).append(tablesplit).toString());
}
resultscanner.close();
}
}
return
splits;
}
通过配置设置需要拆分的split数。
hadoop
mapreduce
hbase
split
io
java
static
byte
key
写下你的评论吧 !
吐个槽吧,看都看了
会员登录
|
用户注册
推荐阅读
input
MapReduce 中的输入输出格式控制
本文介绍了如何在 MapReduce 作业中使用 SequenceFileOutputFormat 生成 SequenceFile 文件,并详细解释了 SequenceFile 的结构和用途。 ...
[详细]
蜡笔小新 2024-11-17 14:43:42
input
oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ...
[详细]
蜡笔小新 2024-11-12 19:26:15
java
构建用户画像环境:Hive与SparkSQL的高效整合
本文介绍如何通过整合SparkSQL与Hive来构建高效的用户画像环境,提高数据处理速度和查询效率。 ...
[详细]
蜡笔小新 2024-11-19 09:44:24
input
Java代码保护与混淆:ProGuard详解
在Java开发中,保护代码安全是一个重要的课题。由于Java字节码容易被反编译,因此使用代码混淆工具如ProGuard变得尤为重要。本文将详细介绍如何使用ProGuard进行代码混淆,以及其基本原理和常见问题。 ...
[详细]
蜡笔小新 2024-11-18 16:46:17
text
使用EventBus在Android Fragment间传递参数
本文介绍了如何在Android应用中使用EventBus库在Fragment之间传递参数。通过具体的代码示例,详细说明了EventBus的使用方法和注意事项。 ...
[详细]
蜡笔小新 2024-11-17 17:48:39
input
mybatis 详解(七)一对一、一对多、多对多
mybatis详解(七)------一 ...
[详细]
蜡笔小新 2024-11-17 10:03:06
java
HDFS API
Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ...
[详细]
蜡笔小新 2024-11-13 17:31:50
sum
com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例
com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ...
[详细]
蜡笔小新 2024-11-13 10:47:33
java
JavaWeb文件上传:前端实现与后端处理详解
在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ...
[详细]
蜡笔小新 2024-11-11 19:50:46
client
Presto:高效即席查询引擎的深度解析与应用
本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ...
[详细]
蜡笔小新 2024-11-07 19:17:47
ip
2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ...
[详细]
蜡笔小新 2024-11-06 15:25:14
client
HBase Java API 进阶:过滤器详解与应用实例
本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ...
[详细]
蜡笔小新 2024-11-05 15:08:18
sum
PHP中元素的计量单位是什么?
PHP中元素的计量单位是什么? ...
[详细]
蜡笔小新 2024-11-01 15:06:51
client
Storm集成Kakfa
一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ...
[详细]
蜡笔小新 2024-10-16 20:20:41
input
hadoop3.1.2 first programdefault wordcount (Mac)
hadoop3.1.2安装完成后的第一个实操示例程 ...
[详细]
蜡笔小新 2024-10-15 11:11:55
特贰的大妞
这个家伙很懒,什么也没留下!
Tags | 热门标签
testing
io
substring
timestamp
join
typescript
subset
text
settings
regex
golang
uri
frameworks
config
python3
ip
bytecode
java
node.js
header
solr
metadata
uml
input
install
sum
perl
command
keyword
client
RankList | 热门文章
1
WordPress安装时出现500错误的解决办法
2
冒泡排序法的原理与举例
3
2018四川计算机二本录取,2018年四川省高考二本院校投档线
4
vue项目中也用到了node.js,那么它在其中充当什么作用呢?
5
4.1shell中常用的基础命令
6
详细说明Buffer和Cache的区别
7
flume 收集日志到HDFS
8
从Swift中的IBOutletCollection获取项目 - Getting An Item from IBOutletCollection in Swift
9
特殊表情存数据库处理
10
db_writer_processes默认大小
11
Android Studio模拟器7.0 8.0无法root无法su的原因
12
Terraform import for aws_route_table should not import dynamic routes
13
PID参数
14
ETH行情分析 白盘震荡该如何进场
15
ios模拟器攻略
PHP1.CN | 中国最专业的PHP中文社区 |
DevBox开发工具箱
|
json解析格式化
|
PHP资讯
|
PHP教程
|
数据库技术
|
服务器技术
|
前端开发技术
|
PHP框架
|
开发工具
|
在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved |
京公网安备 11010802041100号
|
京ICP备19059560号-4
| PHP1.CN 第一PHP社区 版权所有