作者:R星月 出处:http://www.cnblogs.com/rxingyue 欢迎转载,也请保留这段声明。谢谢!
做一个项目中由于数据量比较大,并且需要定时增量分析,做了hbase的分页。项目中用到的版本是hbase1.1 。需要启用协处理器 Aggregation
1.启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site.xml这个文件来实现,只需要添加如下代码:
hbase.coprocessor.user.region.classes
org.apache.hadoop.hbase.coprocessor.AggregateImplementation
2.启用表aggregation,只对特定的表生效。通过HBase Shell 来实现。
(1)disable指定表。hbase> disable ‘mytable’
(2)添加aggregation hbase> alter ‘mytable’, METHOD => ‘table_att’,’coprocessor’=>’|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||’
(3)重启指定表 hbase> enable ‘mytable’
Hbase客户端调用代码示例
1、 得到hbase的表结构总数
public int getTotalRecord(Table keyIndexTable , String nowTime){
int count=0;
AggregationClient aggregationClient = new AggregationClient(config);
Scan scan=new Scan();
scan.setStopRow(nowTime.getBytes());//小于当前时间
try {
Long rowCount = aggregationClient.rowCount(keyIndexTable, new LongColumnInterpreter(), scan);
aggregationClient.close();
count=rowCount.intValue();
} catch (Throwable e) {
e.printStackTrace();
}
return count;
}
2 ,实现分页
public MapgetIndexTableInfo(Table table,String tableName, String nowTime,String startRow, Integer currentPage, Integer pageSize){
Map communtiyKeysMap=new TreeMap();
ResultScanner scanner= null;//为分页创建的封装类对象,下面有给出具体属性
try{//获取最大返回结果数量
if (pageSize == null || pageSize == 0L)
pageSize= 100;if (currentPage == null || currentPage == 0)
currentPage= 1;//计算起始页和结束页
Integer nowPageSize=pageSize+1;//MUST_PASS_ALL(条件 AND) MUST_PASS_ONE(条件OR)
FilterList filterList = newFilterList(FilterList.Operator.MUST_PASS_ALL);
Filter filter1=newPageFilter(nowPageSize);
filterList.addFilter(filter1);//if(tableName.equals("COMMUNITY_KEYS_INDEX")){//Filter filter2 = new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("communitykey")));//filterList.addFilter(filter2);//}
Scan scan = newScan();
scan.setFilter(filterList);
scan.setMaxResultSize(nowPageSize);
scan.setStartRow(Bytes.toBytes(startRow));if(!nowTime.equals("")){
scan.setStopRow(nowTime.getBytes());
}
scanner=table.getScanner(scan);int i = 1;//遍历扫描器对象, 并将需要查询出来的数据row key取出
for(Result result : scanner) {
String row=newString(result.getRow());for(Cell cell : result.rawCells()) {//System.out.println("列族:"+new String(CellUtil.cloneQualifier(cell))+">>>"+new String(CellUtil.cloneValue(cell)));
if(i==nowPageSize){
communtiyKeysMap.put("nextStart", row.substring(0,row.lastIndexOf(":")));break;
}
communtiyKeysMap.put(row,newString(CellUtil.cloneValue(cell)));
}
i++;
}
}catch(IOException e) {
e.printStackTrace();
}finally{if (scanner != null)
scanner.close();
}returncommuntiyKeysMap;
}
3,该分页中处理和跳转下一页
for(int page&#61;1;page<&#61;pageNum;page&#43;&#43;){ //分页
List pageList &#61; new ArrayList(); //子类调用具体分析//1.查出要分析的数据
Map communtiyKeysMap&#61;getIndexTableInfo(hTable,hbaseIndexTabel,nowTime,startRow,page,pageSize);for(String communitykey:communtiyKeysMap.keySet()){
String rowKeyIndex&#61;communitykey;
String cellValue&#61;communtiyKeysMap.get(rowKeyIndex);if(communitykey.equals("nextStart")){
startRow&#61;cellValue;continue; //下一页进行跳转
}
}//实现调用具体的分析//实现该分页处理
}
该过程总共为三步&#xff0c;1.设置表的协处理器 Aggregation&#xff0c;使表能够实现统计功能。2.分页&#xff0c;每次取出1001条数据&#xff0c;每页数据为1000条&#xff0c;第1001条的rowkey为下一页的startrowkey&#xff0c;做为标志“nextStart” 。3分页之后进行查找关联数据和进行逻辑分析处理。