Flink整合Kafka,可以从Kafka中获取数据进行分析,分析之后要把结果存入HBase中
编写一个操作HBase的工具类。HBase作为一个数据库,写一个工具类,实现数据的增删改查
1. API介绍
方法名 | 用途 | 参数说明 | 返回值 |
---|
getTable | 创建/获取表 | tableNameStr:表名 columnFamily:列族名 | HBase Table对象 |
putData | 插入/更新一列数据 | tableNameStr: 表名 rowkey:String rowkey columnFamily:列族名 column:String 列名 data:String 列值 | 空 |
putMapData | 插入/更新多个列数据 | tableNameStr: 表名 rowkey:String rowkey columnFamily:列族名 mapData:列名/列值 | 空 |
getData | 根据rowkey,列族+列名 获取单列数据 | tableNameStr: 表名 rowkey:String rowkey columnFamily:列族名 column:列名 | 列对应的数据 String类型 |
getMapData | 根据rowkey,列族+列名集合 获取多列数据 | tableNameStr: 表名 rowkey:String rowkey columnFamily:列族名 column:列名集合 | 列对应的数据 Map[列名, 列值] |
deleteData | 根据rowkey删除一条数据 | tableNameStr: 表名 rowkey:rowkey columnFamily: 列族名 | 空 |
HBase操作基本类
类名 | 用途 | 获取方式 |
---|
Configuration | HBase的配置类 | HBaseConfiguration.create |
Connection | 连接 | ConnectionFactory.createConnection(conf) |
Admin | HBase的操作API | Connection.getAdmin |
Table | 用来链接HBase的单表 | Connection.getTable() |
Get | 用来查询HBase的单行数据 | new Get(rowkey.getBytes()) |
Put | 保存单行数据 | new Put(rowkey.getBytes()) |
Delete | 删除单行数据 | new Delete(rowkey.getBytes()) |
2. 获取表
开发步骤:
- 将导入
hbase-site.xml
配置文件到resources
目录 - 在
util
包中添加HBaseUtil
- 使用
HBaseConfiguration.create
获取配置对象Configuration
,该配置对象会自动加载hbase-site.xml - 使用
ConnectionFactory.createConnection
获取hbase连接 - 使用
Connection.getAdmin
获取与master的连接
- 创建
getTable
方法 - 构建
TableName
- 构建
TableDescriptorBuilder
- 构建
ColumnFamilyDescriptor
- 添加列族
- 检查表是否存在,若不存在,则创建表
源码解析:
3. 存储数据
创建putData
方法
- 调用getTable获取表
- 构建
Put
对象 - 添加列、列值
- 对table执行put操作
- 启动编写main进行测试
4. 获取数据
- 使用
Connection
获取表 - 创建
getData
方法 - 调用getTable获取表
- 构建
Get
对象 - 对table执行get操作,获取result
- 使用Result.getValue获取列族列对应的值
- 捕获异常
- 关闭表
- 启动
hhbase
- 启动编写main进行测试
# 5. 批量存储数据
创建`putMapData`方法- 调用`getTable`获取表
- 构建`Put`对象
- 添加Map中的列、列值
- 对table执行put操作
- 捕获异常
- 关闭表
- 启动编写main进行测试# 6. 批量获取数据
创建`putMapData`方法- 调用`getTable`获取表
- 构建`Get`对象
- 根据Get对象查询表
- 构建可变Map
- 遍历查询各个列的列值
- 过滤掉不符合的结果
- 把结果转换为Map返回
- 捕获异常
- 关闭表
- 启动编写main进行测试# 7. 删除数据
创建`deleteData`方法- 调用`getTable`获取表
- 构建`Delete`对象
- 对table执行delete操作
- 捕获异常
- 关闭表
- 启动编写main进行测试完整代码:```java
package com.xu.realprocess.utilimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptor, _}
import org.apache.hadoop.hbase.util.Bytes
object HBaseUtil {val conf: Configuration &#61; HBaseConfiguration.create()val conn: Connection &#61; ConnectionFactory.createConnection(conf)val admin: Admin &#61; conn.getAdmindef getTable(tableNameStr: String, columnFamilyName: String): Table &#61; {val tableName: TableName &#61; TableName.valueOf(tableNameStr)if (!admin.tableExists(tableName)) {val descBuilder: TableDescriptorBuilder &#61; TableDescriptorBuilder.newBuilder(tableName)val familyDescriptor: ColumnFamilyDescriptor &#61; ColumnFamilyDescriptorBuilder.newBuilder(columnFamilyName.getBytes).build()descBuilder.setColumnFamily(familyDescriptor)admin.createTable(descBuilder.build())}conn.getTable(tableName)}def putData(tableNameStr: String, rowkey: String, columnFamilyName: String, columnName: String, columnValue: String) &#61; {val table: Table &#61; getTable(tableNameStr, columnFamilyName)try {val put: Put &#61; new Put(rowkey.getBytes)put.addColumn(columnFamilyName.getBytes, columnName.getBytes, columnValue.getBytes)table.put(put)} catch {case ex: Exception &#61;> {ex.printStackTrace()}} finally {table.close()}}def getData(tableNameStr: String, rowkey: String, columnFamilyName: String, columnName: String): String &#61; {val table &#61; getTable(tableNameStr, columnFamilyName)try {val get &#61; new Get(rowkey.getBytes)val result: Result &#61; table.get(get)if (result !&#61; null && result.containsColumn(columnFamilyName.getBytes, columnName.getBytes)) {val bytes: Array[Byte] &#61; result.getValue(columnFamilyName.getBytes(), columnName.getBytes)Bytes.toString(bytes)} else {""}} catch {case ex: Exception &#61;> {ex.printStackTrace()""}} finally {table.close()}}def putMapData(tableNameStr: String, rowkey: String, columnFamilyName: String, map: Map[String, Any]) &#61; {val table &#61; getTable(tableNameStr, columnFamilyName)try {val put &#61; new Put(rowkey.getBytes)for ((colName, colValue) <- map) {put.addColumn(columnFamilyName.getBytes, colName.getBytes, colValue.toString.getBytes)}table.put(put)} catch {case ex: Exception &#61;> {ex.printStackTrace()}} finally {table.close()}}def getMapData(tableNameStr: String, rowkey: String, columnFamilyName: String, columnNameList: List[String]): Map[String, String] &#61; {val table &#61; getTable(tableNameStr, columnFamilyName)try{val get &#61; new Get(rowkey.getBytes)val result: Result &#61; table.get(get)columnNameList.map {col &#61;>val bytes: Array[Byte] &#61; result.getValue(columnFamilyName.getBytes(), col.getBytes)if (bytes !&#61; null && bytes.size > 0) {col -> Bytes.toString(bytes)}else{""->""}}.filter(_._1!&#61;"").toMap}catch{case ex:Exception&#61;>{ex.printStackTrace()Map[String,String]()}}finally {table.close()}}def deleteData(tableNameStr:String,rowkey:String,columnFamilyName:String)&#61;{val table:Table &#61; getTable(tableNameStr,columnFamilyName)try{val delete:Delete &#61; new Delete(rowkey.getBytes)table.delete(delete)}catch {case ex:Exception&#61;>ex.printStackTrace()}finally {table.close()}}}
main方法测试代码
object HBaseUtil {def main(args: Array[String]): Unit &#61; {// 测试存入单列数据putData("test", "123", "info", "t1", "hello world")// 测试存入多列数据val map &#61; Map("t2" -> "scala","t3" -> "hive","t4" -> "sqoop")putMapData("test", "123", "info", map)println(getData("test", "123", "info", "t1"))println(getData("test", "123", "info", "t2"))println(getData("test", "123", "info", "t3"))println(getData("test", "123", "info", "t4"))println(getMapData("test", "123", "info", List("t1", "t2","t3","t4")))deleteData("test", "123", "info")}}
查看页面&#xff1a;
http://node01:16010/master-status#userTables