import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.elasticsearch.client.Client; //import org.elasticsearch.client.transport.TransportClient; //import org.elasticsearch.common.settings.ImmutableSettings; //import org.elasticsearch.common.settings.Settings; //import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; //import java.util.NavigableMap;
public class DataSyncObserver extends BaseRegionObserver { private static Client client = null; private static final Log LOG = LogFactory.getLog(DataSyncObserver.class); /** * 读取HBase Shell的指令参数 * * @param env */
private void readConfiguration(CoprocessorEnvironment env) { Configuration conf = env.getConfiguration(); Config.clusterName = conf.get("es_cluster"); Config.nodeHost = conf.get("es_host"); Config.nodePort = conf.getInt("es_port", -1); Config.indexName = conf.get("es_index"); Config.typeName = conf.get("es_type"); LOG.info("observer -- started with config: " + Config.getInfo()); } @Override public void start(CoprocessorEnvironment env) throws IOException { readConfiguration(env); // Settings settings = ImmutableSettings.settingsBuilder() // .put("cluster.name", Config.clusterName).build(); // client = new TransportClient(settings) // .addTransportAddress(new InetSocketTransportAddress( // Config.nodeHost, Config.nodePort));
client = MyTransportClient.client; } @Override public void postPut(ObserverContext e, Put put, WALEdit edit, Durability durability) throws IOException { try { String indexId = new String(put.getRow()); Map<byte[], List> familyMap = put.getFamilyCellMap(); // NavigableMap> familyMap = put.getFamilyCellMap();
Map json = new HashMap(); for (Map.Entry<byte[], List> entry : familyMap.entrySet()) { for (Cell cell : entry.getValue()) { String key = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); json.put(key, value); } } System.out.println(); ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json)); LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName); } catch (Exception ex) { LOG.error(ex); } } @Override public void postDelete(final ObserverContext e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException { try { String indexId = new String(delete.getRow()); ElasticSearchOperator.addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, indexId)); LOG.info("observer -- delete a doc: " + indexId); } catch (Exception ex) { LOG.error(ex); } } } | |
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.Client; //import org.elasticsearch.client.transport.TransportClient; //import org.elasticsearch.common.settings.ImmutableSettings; //import org.elasticsearch.common.settings.Settings; //import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.util.HashMap; import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ElasticSearchOperator { // 缓冲池容量
private static final int MAX_BULK_COUNT = 10; // 最大提交间隔(秒)
private static final int MAX_COMMIT_INTERVAL = 60 * 5; private static Client client = null; private static BulkRequestBuilder bulkRequestBuilder = null; private static Lock commitLock = new ReentrantLock(); static { // elasticsearch1.5.0 // Settings settings = ImmutableSettings.settingsBuilder() // .put("cluster.name", Config.clusterName).build(); // client = new TransportClient(settings) // .addTransportAddress(new InetSocketTransportAddress( // Config.nodeHost, Config.nodePort)); // 2.3.5
client = MyTransportClient.client; bulkRequestBuilder = client.prepareBulk(); bulkRequestBuilder.setRefresh(true); Timer timer = new Timer(); timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000); } /** * 判断缓存池是否已满,批量提交 * * @param threshold */
private static void bulkRequest(int threshold) { if (bulkRequestBuilder.numberOfActions() > threshold) { BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); if (!bulkResponse.hasFailures()) { bulkRequestBuilder = client.prepareBulk(); } } } /** * 加入索引请求到缓冲池 * * @param builder */
public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) { commitLock.lock(); try { bulkRequestBuilder.add(builder); bulkRequest(MAX_BULK_COUNT); } catch (Exception ex) { ex.printStackTrace(); } finally { commitLock.unlock(); } } /** * 加入删除请求到缓冲池 * * @param builder */
public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) { commitLock.lock(); try { bulkRequestBuilder.add(builder); bulkRequest(MAX_BULK_COUNT); } catch (Exception ex) { ex.printStackTrace(); } finally { commitLock.unlock(); } } /** * 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步 */
static class CommitTimer extends TimerTask { @Override public void run() { commitLock.lock(); try { bulkRequest(0); } catch (Exception ex) { ex.printStackTrace(); } finally { commitLock.unlock(); } } } }
mvn clean compile assembly:single mv observer-1.0-SNAPSHOT-jar-with-dependencies.jar observer-hb0.98-es2.3.5.jar hdfs dfs -put observer-hb0.98-es2.3.5.jar /hbase/lib/
hbase shell create 'region','data' disable 'region' alter 'region', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=mysql_region,es_index=hbase,es_port=9300,es_host=localhost' enable 'region'
create 'sp','data' disable 'sp' alter 'sp', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=oracle_sp,es_index=hbase,es_port=9300,es_host=localhost' enable 'sp'
hbase(main):007:0* describe 'ora_test'
Table ora_test is ENABLED ora_test, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs:///appdt/hbase
/lib/observer-hb1.2.2-es2.3.5.jar|com.gavin.observer.DataSyncObserver |1001|es_cluster=elas2.3.4,es_type=ora_test,es_index=hbase,es_port=93
00,es_host=localhost'}
COLUMN FAMILIES DESCRIPTION {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIOnS=> '1', COMPRESSION => 'NONE', MI N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'} 1 row(s) in 0.0260 seconds
disable 'ora_test' alter 'ora_test',METHOD => 'table_att_unset',NAME =>'coprocessor$1' enable 'ora_test'
hbase(main):011:0> describe 'ora_test'
Table ora_test is ENABLED ora_test COLUMN FAMILIES DESCRIPTION {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIOnS=> '1', COMPRESSION => 'NONE', MI N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'} 1 row(s) in 0.0200 seconds
bin/sqoop import --connect jdbc:mysql://192.168.1.187:3306/trade_dev --username mysql --password 111111 --table TB_REGION --hbase-table region --hbase-row-key REGION_ID --column-family data
bin/sqoop import --connect jdbc:oracle:thin:@192.168.16.223:1521/orcl --username sitts --password password --table SITTS.ESB_SERVICE_PARAM --split-by PARAM_ID --hbase-table sp --hbase-row-key PARAM_ID --column-family data
scan 'region'
HBase Observer同步数据到ElasticSearch