在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制TableInputFormat来实现我们的需求了,我们还可以采用Flink的DataSet的方式读取,另外下面还有Spark读取的例子。
使用教程
Md5Util.javaimport org.apache.commons.codec.binary.Hex;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;public class Md5Util { public static String md5(byte[] key) { return md5(key, 0, key.length);
} public static String md5(byte[] key, int offset, int length) { try {
MessageDigest e = MessageDigest.getInstance("MD5");
e.update(key, offset, length); byte[] digest = e.digest(); return new String(Hex.encodeHex(digest));
} catch (NoSuchAlgorithmException var5) { throw new RuntimeException("Error computing MD5 hash", var5);
}
} public static String md5(String str) { return md5(str.getBytes());
} public static String md5(String str,int offset, int length) { return md5(str.getBytes(),offset,length);
}
}
数据Split方式private Connection connection; private Admin admin; @Before
public void init() throws Exception {
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
System.setProperty("sun.security.krb5.debug", "false"); final String user = "hbase/abc.demo.com@DEMO.COM"; final String keyPath = "/home/dounine/kerberos/lake.keytab";
Configuration conf = new Configuration();
conf.addResource("hbase-site.xml");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(user, keyPath);
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
}@Test
public void createTable() throws IOException {
TableName table = TableName.valueOf("logTable1");
TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table);
tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName());
tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2");
ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build();
ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build();
ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build();
tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF)); try { byte[][] splitKeys = new byte[4][];
splitKeys[0] = Bytes.toBytes("00");
splitKeys[1] = Bytes.toBytes("40");
splitKeys[2] = Bytes.toBytes("80");
splitKeys[3] = Bytes.toBytes("c0");
admin.createTable(tableDesc.build(),splitKeys);
} catch (IOException e) {
e.printStackTrace();
}
}
logTable1数据写入方式public class HbaseKerberos{
private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class);
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
private static final String TABLE_NAME = "logTable1";
public void insertDataToHbase1(String appKey,List hasDatas) throws IOException {
Table table = HbaseUtils.getTable(TABLE_NAME);
Long sumCount = 0L; /**
* 常规值
*/
byte[] extCF = Bytes.toBytes("ext");//CF列族
Random random = new Random();
List rows &#61; new ArrayList<>(); for (Log logEntity : hasDatas) {
JSONObject dataJsonObject &#61; logEntity.getData();
JSONObject extJsonObject &#61; dataJsonObject.getJSONObject("ext"); String userId &#61; extJsonObject.getString("userId"); String timeStr &#61; logEntity.getTime().format(dtf);
String md5Str &#61; Md5Util.md5(userId); String rowKey &#61; new StringBuilder()
.append(md5Str.substring(0,2))//md5出来的前两位最高为ff,00~ff为256位&#xff0c;后期Region可以增加那么多&#xff0c;足够使用了。
.append("|")
.append(timeStr)//时间
.append("|")
.append(CrcUtil.getCrcValue(appKey))
.append("|")
.append(md5Str.substring(2,8))
.append("|")
.append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2))
.toString();
Put row &#61; new Put(Bytes.toBytes(rowKey)); for(String keyName : extJsonObject.keySet()){ String value &#61; extJsonObject.getString(keyName); if(StringUtils.isNotBlank(value)){
row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value));
}
}
row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString()));
/**
* 设备信息
*/
putFieldToRow(logEntity.getData(),"device",row); /**
* 位置信息
*/
putFieldToRow(logEntity.getData(),"location",row);
rows.add(row);
} for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){ Object[] results &#61; new Object[(durtation[1]-durtation[0])]; try {
table.batch(rows.subList(durtation[0], durtation[1]),results);
} catch (InterruptedException e) {
e.printStackTrace();
}
sumCount &#43;&#61; (durtation[1]-durtation[0]);
}
LOGGER.info("write data count:" &#43; sumCount);
}
}
logTable1数据00|20180518203401772|2352356512|4519 column&#61;ext:appKey, timestamp&#61;1533646292389, value&#61;898b7e90-5754-11e8-983c-6b4bcc3b7c2e
f3|f1
00|20180518203401772|2352356512|4519 column&#61;ext:channelCode, timestamp&#61;1533646292389, value&#61;guanlan-resurrection-002-
f3|f1
00|20180518203401772|2352356512|4519 column&#61;ext:createDateTime, timestamp&#61;1533646292389, value&#61;1526646836093
f3|f1
00|20180518203401772|2352356512|4519 column&#61;ext:retain, timestamp&#61;1533646292389, value&#61;17670
f3|f1
00|20180518203401772|2352356512|4519 column&#61;ext:scene, timestamp&#61;1533646292389, value&#61;1007
f3|f1
00|20180518203401772|2352356512|4519 column&#61;ext:shareId, timestamp&#61;1533646292389, value&#61;ogJmG5ItE_nBCS3pg5XCvGotGI1c
f3|f1
00|20180518203401772|2352356512|4519 column&#61;ext:time, timestamp&#61;1533646292389, value&#61;2018-05-18T20:34:01
f3|f1
00|20180518203401772|2352356512|4519 column&#61;ext:type, timestamp&#61;1533646292389, value&#61;login_in
f3|f1
00|20180518203401772|2352356512|4519 column&#61;ext:userId, timestamp&#61;1533646292389, value&#61;ogJmG5KRcIxtyg7UmcRHFCn6YiAQ
f3|f1
00|20180518203406167|2352356512|4519 column&#61;ext:appKey, timestamp&#61;1533646347725, value&#61;898b7e90-5754-11e8-983c-6b4bcc3b7c2e
f3|54
00|20180518203406167|2352356512|4519 column&#61;ext:channelCode, timestamp&#61;1533646347725, value&#61;guanlan-regular-001-
f3|54
00|20180518203406167|2352356512|4519 column&#61;ext:createDateTime, timestamp&#61;1533646347725, value&#61;1526646839075
f3|54
00|20180518203406167|2352356512|4519 column&#61;ext:retain, timestamp&#61;1533646347725, value&#61;17670
f3|54
00|20180518203406167|2352356512|4519 column&#61;ext:shareId, timestamp&#61;1533646347725, value&#61;ogJmG5KRcIxtyg7UmcRHFCn6YiAQ
f3|54
00|20180518203406167|2352356512|4519 column&#61;ext:time, timestamp&#61;1533646347725, value&#61;2018-05-18T20:34:06
f3|54
00|20180518203406167|2352356512|4519 column&#61;ext:type, timestamp&#61;1533646347725, value&#61;sharesuccess
f3|54
00|20180518203406167|2352356512|4519 column&#61;ext:userId, timestamp&#61;1533646347725, value&#61;ogJmG5KRcIxtyg7UmcRHFCn6YiAQ
f3|54
00|20180518203407144|2352356512|5ca1 column&#61;ext:appKey, timestamp&#61;1533646294045, value&#61;898b7e90-5754-11e8-983c-6b4bcc3b7c2e
c4|bc
00|20180518203407144|2352356512|5ca1 column&#61;ext:createDateTime, timestamp&#61;1533646294045, value&#61;1526646849745
c4|bc
00|20180518203407144|2352356512|5ca1 column&#61;ext:retain, timestamp&#61;1533646294045, value&#61;17670
c4|bc
00|20180518203407144|2352356512|5ca1 column&#61;ext:scene, timestamp&#61;1533646294045, value&#61;1037
c4|bc
00|20180518203407144|2352356512|5ca1 column&#61;ext:time, timestamp&#61;1533646294045, value&#61;2018-05-18T20:34:07
c4|bc
00|20180518203407144|2352356512|5ca1 column&#61;ext:type, timestamp&#61;1533646294045, value&#61;login_in
CustomTableInputFormat.javaimport org.apache.commons.lang3.StringUtils;import org.apache.hadoop.hbase.HRegionLocation;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.hbase.util.Strings;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.net.DNS;import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;public class CustomTableInputFormat extends TableInputFormat { private HashMap reverseDNSCacheMap &#61; new HashMap<>(); private List keys &#61; new ArrayList<>(); public CustomTableInputFormat(){ super(); for(int i &#61;0;i<256;i&#43;&#43;){
keys.add(StringUtils.substring("00"&#43;Integer.toHexString(i),-2));
}
} &#64;Override
public List getSplits(JobContext context) throws IOException { super.initialize(context);
TableName tableName &#61; super.getTable().getName();
RegionSizeCalculator sizeCalculator &#61; new RegionSizeCalculator(getRegionLocator(), getAdmin());
List splits &#61; new ArrayList<>(); for (String key : keys) {
HRegionLocation location &#61; getRegionLocator().getRegionLocation(Bytes.toBytes(key), false);
InetSocketAddress isa &#61; new InetSocketAddress(location.getHostname(), location.getPort());
InetAddress regionAddress &#61; isa.getAddress();
String regionLocation;
regionLocation &#61; reverseDNS(regionAddress); byte[] regionName &#61; location.getRegion().getRegionName();
String encodedRegionName &#61; location.getRegion().getEncodedName(); long regionSize &#61; sizeCalculator.getRegionSize(regionName); byte[] splitStart &#61; Bytes.add(Bytes.toBytes(key&#43;"|"),this.getScan().getStartRow()); byte[] splitStop &#61; Bytes.add(Bytes.toBytes(key&#43;"|"),this.getScan().getStopRow());
TableSplit split &#61; new TableSplit(tableName, this.getScan(),
splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
splits.add(split);
} return splits;
} String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
String hostName &#61; this.reverseDNSCacheMap.get(ipAddress); if (hostName &#61;&#61; null) {
String ipAddressString &#61; null; try {
ipAddressString &#61; DNS.reverseDns(ipAddress, null);
} catch (Exception e) {
ipAddressString &#61; InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
} if (ipAddressString &#61;&#61; null) throw new UnknownHostException("No host found for " &#43; ipAddress);
hostName &#61; Strings.domainNamePointerToHostName(ipAddressString); this.reverseDNSCacheMap.put(ipAddress, hostName);
} return hostName;
}
}
Flink例子static Configuration conf; static {
HadoopKrbLogin.login();
conf &#61; new Configuration();
String tableName &#61; "logTable1";
conf.addResource("hbase-site.xml");
Scan scan &#61; new Scan();
scan.setCaching(1000);
scan.withStartRow("201805182039".getBytes());
scan.withStopRow("201805182040".getBytes());
scan.setCacheBlocks(false);
conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName);
ClientProtos.Scan proto &#61; null; try {
proto &#61; ProtobufUtil.toScan(scan);
} catch (IOException e) {
e.printStackTrace();
}
String ScanToString &#61; Base64.encodeBytes(proto.toByteArray());
conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString);
} public static void main(String[] args) throws Exception { final ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();
DataSource> hbase &#61; env.createInput(
HadoopInputs.createHadoopInput( new CustomTableInputFormat(),
ImmutableBytesWritable.class,
Result.class,
Job.getInstance(conf)
)
);
DataSet toTuple &#61; hbase.map( new MapFunction, LogEntity>() { public LogEntity map(Tuple2 record) throws Exception {
Result result &#61; record.f1; return result2Entity(result);
}
});
}private static LogEntity result2Entity(Result result) {
JSONObject root &#61; new JSONObject();
JSONObject ext &#61; new JSONObject();
JSONObject device &#61; new JSONObject();
JSONObject location &#61; new JSONObject(); for (Cell cell : result.rawCells()) { byte[] family &#61; CellUtil.cloneFamily(cell); byte[] column &#61; CellUtil.cloneQualifier(cell); byte[] value &#61; CellUtil.cloneValue(cell);
String columnName &#61; Bytes.toString(column); if ("ext".equals(Bytes.toString(family))) { if ("durationTime".equals(columnName)) {
ext.put(columnName, Bytes.toLong(value));
} else if ("time".equals(columnName)) {
root.put(columnName, Bytes.toString(value));
root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));
} else {
ext.put(columnName, Bytes.toString(value));
}
} else if ("device".equals(Bytes.toString(family))) {
device.put(columnName, Bytes.toString(value));
} else if ("location".equals(Bytes.toString(family))) {
location.put(columnName, Bytes.toString(value));
}
}
JSONObject data &#61; new JSONObject(); if (device.keySet().size() > 0) {
data.put("device", device);
} if (location.keySet().size() > 0) {
data.put("location", location);
}
data.put("ext", ext);
root.put("data", data); return JSON.parseObject(root.toString(), LogEntity.class);
}
Spark 例子public class SimpleApp implements Serializable {
static Configuration cfg &#61; null; static {
HadoopKrbLogin.login();
cfg &#61; new Configuration();
String tableName &#61; "logTable1";
cfg.addResource("hbase-site.xml");
Scan scan &#61; new Scan();
scan.setCaching(1000);
scan.withStartRow("201805182039".getBytes());
scan.withStopRow("201805182040".getBytes());
scan.setCacheBlocks(false);
cfg.set(TableInputFormat.INPUT_TABLE, tableName);
ClientProtos.Scan proto &#61; null; try {
proto &#61; ProtobufUtil.toScan(scan);
} catch (IOException e) {
e.printStackTrace();
}
String ScanToString &#61; Base64.encodeBytes(proto.toByteArray());
cfg.set(TableInputFormat.SCAN, ScanToString);
}public static void main(String[] args) {
SparkConf sparkConf &#61; new SparkConf()
.setMaster("local")
.setAppName("HbaseDemo");
JavaSparkContext jsc &#61; new JavaSparkContext(sparkConf);
JavaPairRDD hBaseRDD &#61;
jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class); // do some transformation
JavaRDD rdd1 &#61; hBaseRDD.mapPartitions((FlatMapFunction>, LogEntity>)
tuple2Iterator -> {
List logEntities &#61; new ArrayList<>(); while (tuple2Iterator.hasNext()) {
Tuple2 tuple &#61; tuple2Iterator.next();
Result result &#61; tuple._2;
String rowKey &#61; Bytes.toString(result.getRow());
logEntities.add(result2Entity(result));
} return logEntities.iterator();
});
}private static LogEntity result2Entity(Result result) {
JSONObject root &#61; new JSONObject();
JSONObject ext &#61; new JSONObject();
JSONObject device &#61; new JSONObject();
JSONObject location &#61; new JSONObject(); for (Cell cell : result.rawCells()) { byte[] family &#61; CellUtil.cloneFamily(cell); byte[] column &#61; CellUtil.cloneQualifier(cell); byte[] value &#61; CellUtil.cloneValue(cell);
String columnName &#61; Bytes.toString(column); if ("ext".equals(Bytes.toString(family))) { if ("durationTime".equals(columnName)) {
ext.put(columnName, Bytes.toLong(value));
} else if ("time".equals(columnName)) {
root.put(columnName, Bytes.toString(value));
root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));
} else {
ext.put(columnName, Bytes.toString(value));
}
} else if ("device".equals(Bytes.toString(family))) {
device.put(columnName, Bytes.toString(value));
} else if ("location".equals(Bytes.toString(family))) {
location.put(columnName, Bytes.toString(value));
}
}
JSONObject data &#61; new JSONObject(); if (device.keySet().size() > 0) {
data.put("device", device);
} if (location.keySet().size() > 0) {
data.put("location", location);
}
data.put("ext", ext);
root.put("data", data); return JSON.parseObject(root.toString(), LogEntity.class);
}
作者&#xff1a;dounine
链接&#xff1a;https://www.jianshu.com/p/ed86302eec2a