首页
技术博客
PHP教程
数据库技术
前端开发
HTML5
Nginx
php论坛
新用户注册
|
会员登录
PHP教程
技术博客
编程问答
PNG素材
编程语言
前端技术
Android
PHP教程
HTML5教程
数据库
Linux技术
Nginx技术
PHP安全
WebSerer
职场攻略
JavaScript
开放平台
业界资讯
大话程序猿
登录
极速注册
取消
热门标签 | HotTags
integer
golang
case
python
command
less
random
timezone
install
bitmap
uml
require
utf-8
python3
php
split
fetch
bytecode
perl
io
match
dockerfile
version
java
ip
config
timestamp
expression
heatmap
datetime
regex
express
frameworks
chat
int
c语言
include
spring
select
php5
php7
vbscript
bit
plugins
shell
hashset
function
jsp
testing
subset
email
tags
byte
typescript
export
uri
node.js
char
keyword
join
filter
web
controller
format
go
text
default
import
httpclient
cSharp
web3
hashcode
flutter
request
audio
metadata
vba
bash
dll
当前位置:
开发笔记
>
编程语言
> 正文
分拆TableSplit让多个mapper同时读取
作者:特贰的大妞 | 来源:互联网 | 2023-09-16 19:11
分拆TableSplit让多个mapper同时读取默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个tables
分拆TableSplit 让多个mapper同时读取
默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个table split分拆成多个split,这样hadoop就能通过多个mapper读取。
由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取
Java代码
mapred.min.split.size
mapred.max.split.size
所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。
HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据:
Java代码
TableMapReduceUtil.initTableMapperJob(table[
0
].getBytes(), scan,
UserViewHisMapper2.
class
, Text.
class
, Text.
class
,
genRecommendations);
而这个方法,最终是调用以下方法进行初始化设置的:
Java代码
public
static
void
initTableMapperJob(
byte
[] table, Scan scan,
Class
extends
TableMapper> mapper,
Class
extends
WritableComparable> outputKeyClass,
Class
extends
Writable> outputValueClass, Job job,
boolean
addDependencyJars)
throws
IOException {
initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
outputValueClass, job, addDependencyJars, TableInputFormat.
class
);
}
所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase:
Java代码
public
class
TableInputFormat
extends
TableInputFormatBase
implements
Configurable
最终要修改的则是TableInputFormatBase这个类,修改其以下方法:
Java代码
public
List
getSplits(JobContext context)
throws
IOException {}
这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit:
Java代码
public
List
getSplits(JobContext context)
throws
IOException {
f (table ==
null
) {
throw
new
IOException(
"No table was provided."
);
Pair<
byte
[][],
byte
[][]> keys = table.getStartEndKeys();
if
(keys ==
null
|| keys.getFirst() ==
null
||
keys.getFirst().length ==
0
) {
throw
new
IOException(
"Expecting at least one region."
);
}
int
count =
0
;
List
splits =
new
ArrayList
(keys.getFirst().length);
for
(
int
i =
0
; i < keys.getFirst().length; i++) {
if
( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue
;
}
String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
getHostname();
byte
[] startRow = scan.getStartRow();
byte
[] stopRow = scan.getStopRow();
// determine if the given start an stop key fall into the region
if
((startRow.length ==
0
|| keys.getSecond()[i].length ==
0
||
Bytes.compareTo(startRow, keys.getSecond()[i]) <
0
) &&
(stopRow.length ==
0
||
Bytes.compareTo(stopRow, keys.getFirst()[i]) >
0
)) {
byte
[] splitStart = startRow.length ==
0
||
Bytes.compareTo(keys.getFirst()[i], startRow) >=
0
?
keys.getFirst()[i] : startRow;
byte
[] splitStop = (stopRow.length ==
0
||
Bytes.compareTo(keys.getSecond()[i], stopRow) <=
0
) &&
keys.getSecond()[i].length >
0
?
keys.getSecond()[i] : stopRow;
InputSplit split =
new
TableSplit(table.getTableName(),
splitStart, splitStop, regionLocation);
splits.add(split);
if
(LOG.isDebugEnabled())
LOG.debug(
"getSplits: split -> "
+ (count++) +
" -> "
+ split);
}
}
return
splits;
}
这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。
以下是我的实现方法:
Java代码
public
List
getSplits(JobContext context)
throws
IOException {
if
(table ==
null
) {
throw
new
IOException(
"No table was provided."
);
}
Pair<
byte
[][],
byte
[][]> keys = table.getStartEndKeys();
if
(keys ==
null
|| keys.getFirst() ==
null
|| keys.getFirst().length ==
0
) {
throw
new
IOException(
"Expecting at least one region."
);
}
int
count =
0
;
List
splits =
new
ArrayList
(
keys.getFirst().length);
for
(
int
i =
0
; i < keys.getFirst().length; i++) {
if
(!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue
;
}
String regionLocation = table.getRegionLocation(keys.getFirst()[i],
true
)
.getHostname();
byte
[] startRow = scan.getStartRow();
byte
[] stopRow = scan.getStopRow();
// determine if the given start an stop key fall into the region
if
((startRow.length ==
0
|| keys.getSecond()[i].length ==
0
|| Bytes
.compareTo(startRow, keys.getSecond()[i]) <
0
)
&& (stopRow.length ==
0
|| Bytes.compareTo(stopRow,
keys.getFirst()[i]) >
0
)) {
byte
[] splitStart = startRow.length ==
0
|| Bytes.compareTo(keys.getFirst()[i], startRow) >=
0
? keys
.getFirst()[i] : startRow;
byte
[] splitStop = (stopRow.length ==
0
|| Bytes.compareTo(
keys.getSecond()[i], stopRow) <=
0
)
&& keys.getSecond()[i].length >
0
? keys.getSecond()[i]
: stopRow;
Scan scan1 =
new
Scan();
scan1.setStartRow(splitStart);
scan1.setStopRow(splitStop);
scan1.setFilter(
new
KeyOnlyFilter());
scan1.setBatch(
500
);
ResultScanner resultscanner = table.getScanner(scan1);
//用来保存该region的所有key
List
rows =
new
ArrayList
();
//Iterator
it = resultscanner.iterator();
for
(Result rs : resultscanner)
{
if
(rs.isEmpty())
continue
;
rows.add(
new
String(rs.getRow()));
}
int
splitSize = rows.size() / mappersPerSplit;
for
(
int
j =
0
; j < mappersPerSplit; j++) {
TableSplit tablesplit =
null
;
if
(j == mappersPerSplit -
1
)
tablesplit =
new
TableSplit(table.getTableName(),
rows.get(j * splitSize).getBytes(),
rows.get(rows.size() -
1
).getBytes(),
regionLocation);
else
tablesplit =
new
TableSplit(table.getTableName(),
rows.get(j * splitSize).getBytes(),
rows.get(j * splitSize + splitSize).getBytes(), regionLocation);
splits.add(tablesplit);
if
(LOG.isDebugEnabled())
LOG.debug((
new
StringBuilder())
.append(
"getSplits: split -> "
).append(i++)
.append(
" -> "
).append(tablesplit).toString());
}
resultscanner.close();
}
}
return
splits;
}
通过配置设置需要拆分的split数。
hadoop
mapreduce
hbase
split
io
java
static
byte
key
写下你的评论吧 !
吐个槽吧,看都看了
会员登录
|
用户注册
推荐阅读
java
oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ...
[详细]
蜡笔小新 2024-11-12 19:26:15
java
Java 编程错误:对象无法转换为 long 类型
本文介绍了在 Java 编程中遇到的一个常见错误:对象无法转换为 long 类型,并提供了详细的解决方案。 ...
[详细]
蜡笔小新 2024-11-13 10:57:24
java
如何在Java中使用DButils类
这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ...
[详细]
蜡笔小新 2024-11-12 13:46:11
java
Java 中如何将多参数方法传递给使用 List 的 Function
本文探讨了如何在 Java 中将多参数方法通过 Lambda 表达式传递给一个接受 List 的 Function。具体分析了 `OrderUtil` 类中的 `runInBatches` 方法及其使用场景。 ...
[详细]
蜡笔小新 2024-11-12 22:25:23
ip
微信公众号推送模板40036问题
返回码错误码描述说明40001invalidcredential不合法的调用凭证40002invalidgrant_type不合法的grant_type40003invalidop ...
[详细]
蜡笔小新 2024-11-12 16:31:32
java
SpringMVC 入门指南:快速上手 Java Web 开发
本文将带你快速了解 SpringMVC 框架的基本使用方法,通过实现一个简单的 Controller 并在浏览器中访问,展示 SpringMVC 的强大与简便。 ...
[详细]
蜡笔小新 2024-11-13 14:22:01
java
Java DAO模式详解与代码示例
DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ...
[详细]
蜡笔小新 2024-11-13 12:25:33
config
CentOS 7 中配置开机自动挂载 NFS 的解决方案
本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ...
[详细]
蜡笔小新 2024-11-13 12:05:24
config
com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例
com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ...
[详细]
蜡笔小新 2024-11-13 10:47:33
java
Java 并发编程:深入解析 AtomicInteger 和 CAS 无锁算法
在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ...
[详细]
蜡笔小新 2024-11-12 16:40:04
config
com.hazelcast.config.MapConfig.isStatisticsEnabled()方法的使用及代码示例
com.hazelcast.config.MapConfig.isStatisticsEnabled()方法的使用及代码示例 ...
[详细]
蜡笔小新 2024-11-12 14:33:17
java
Java高并发与多线程(二):线程的实现方式详解
本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ...
[详细]
蜡笔小新 2024-11-12 14:31:23
io
实验九:使用SharedPreferences存储简单数据
本实验旨在帮助学生理解和掌握使用SharedPreferences存储和读取简单数据的方法,包括程序参数和用户选项。 ...
[详细]
蜡笔小新 2024-11-12 14:21:47
java
字节流(InputStream和OutputStream),字节流读写文件,字节流的缓冲区,字节缓冲流
字节流抽象类InputStream和OutputStream是字节流的顶级父类所有的字节输入流都继承自InputStream,所有的输出流都继承子OutputStreamInput ...
[详细]
蜡笔小新 2024-11-12 14:07:25
ip
PHP 对象生命周期与内存管理
本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ...
[详细]
蜡笔小新 2024-11-12 13:35:26
特贰的大妞
这个家伙很懒,什么也没留下!
Tags | 热门标签
integer
golang
case
python
command
less
random
timezone
install
bitmap
uml
require
utf-8
python3
php
split
fetch
bytecode
perl
io
match
dockerfile
version
java
ip
config
timestamp
expression
heatmap
datetime
RankList | 热门文章
1
Ubuntu16.0464位安装armlinuxgcc交叉编译器以及samba服务器
2
跳转页面_ionic4 跳转页面
3
Linux系统管理之rpm软件包管理
4
JavaHashMap原理解析
5
[二分图]JZOJ 4612 游戏
6
Vue开发实例目录总索引
7
ASP.NET Core WebAPI 开发新建WebAPI项目 转
8
gis平移至所选要素_详解:ArcGIS中如何实现矢量数据平移
9
PyQt 如何创建自定义QWidget
10
开发笔记:深度探索!Android之OkHttp网络架构源码解析
11
EasyGBS无法播放WebRTC格式的视频流,是什么原因?
12
好看的综艺节目具备什么特点?:节目情节设置
13
【中国大陆手机号码决收不到验证码的问题】
14
阿里云大数据计算服务MaxCompute (原名 ODPS)
15
体验分析(收藏)
PHP1.CN | 中国最专业的PHP中文社区 |
DevBox开发工具箱
|
json解析格式化
|
PHP资讯
|
PHP教程
|
数据库技术
|
服务器技术
|
前端开发技术
|
PHP框架
|
开发工具
|
在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved |
京公网安备 11010802041100号
|
京ICP备19059560号-4
| PHP1.CN 第一PHP社区 版权所有