public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// do mothing
}
@Override
public void getSum(RpcController controller, SumRequest request, RpcCallback done) {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
SumResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List results = new ArrayList();
boolean hasMore = false;
long sum = 0L;
do {
hasMore = scanner.next(results);
for (Cell cell : results) {
sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
}
results.clear();
} while (hasMore);
response = SumResponse.newBuilder().setSum(sum).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
}
Configuration conf = HBaseConfiguration.create();
// Use below code for HBase version 1.x.x or above.
Connection cOnnection= ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("users");
Table table = connection.getTable(tableName);
//Use below code HBase version 0.98.xx or below.
//HConnection cOnnection= HConnectionManager.createConnection(conf);
//HTableInterface table = connection.getTable("users");
final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross")
.build();
try {
Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null,
new Batch.Call() {
@Override
public Long call(SumService aggregate) throws IOException {
BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
aggregate.getSum(null, request, rpcCallback);
SumResponse response = rpcCallback.get();
return response.hasSum() ? response.getSum() : 0L;
}
});
for (Long sum : results.values()) {
System.out.println("Sum = " + sum);
}
} catch (ServiceException e) {
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
}