HRegionServer调用合并请求
主要逻辑如下:
-
-
-
-
- CompactSplitThread#requestCompaction() {
- for (Store s : r.getStores().values()) {
- CompactionRequest cr = Store.requestCompaction(priority, request);
- ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
- ? largeCompactions : smallCompactions;
- pool.execute(cr);
- ret.add(cr);
- }
- }
-
-
-
-
- Store#throttleCompaction() {
- long throttlePoint = conf.getLong(
- "hbase.regionserver.thread.compaction.throttle",
- 2 * this.minFilesToCompact * this.region.memstoreFlushSize);
- return compactionSize > throttlePoint;
- }
-
-
- Store#compactSelection() {
-
- if(storefile.maxTimeStamp &#43; store.ttl < now_timestamp) {
-
- }
-
-
-
- while (pos < compactSelection.getFilesToCompact().size() &&
- compactSelection.getFilesToCompact().get(pos).getReader().length()
- > while (pos < compactSelection.getFilesToCompact().size() &&
- compactSelection.getFilesToCompact().get(pos).getReader().length()
- > maxCompactSize &&
- !compactSelection.getFilesToCompact().get(pos).isReference()) &#43;&#43;pos;
- if (pos !&#61; 0) compactSelection.clearSubList(0, pos); &&
- !compactSelection.getFilesToCompact().get(pos).isReference()) {
- &#43;&#43;pos;
- }
- if (pos !&#61; 0) {
- compactSelection.clearSubList(0, pos);
- }
- if (compactSelection.getFilesToCompact().size() < minFilesToCompact) {
- return;
- }
-
-
-
-
-
- int countOfFiles &#61; compactSelection.getFilesToCompact().size();
- long [] fileSizes &#61; new long[countOfFiles];
- long [] sumSize &#61; new long[countOfFiles];
- for (int i &#61; countOfFiles-1; i >&#61; 0; --i) {
- StoreFile file &#61; compactSelection.getFilesToCompact().get(i);
- fileSizes[i] &#61; file.getReader().length();
-
- int tooFar &#61; i &#43; this.maxFilesToCompact - 1;
- sumSize[i] &#61; fileSizes[i] &#43; ((i&#43;1 < countOfFiles) ? sumSize[i&#43;1] : 0)
- - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
- }
-
-
-
- while(countOfFiles - start >&#61; this.minFilesToCompact && fileSizes[start] >
- Math.max(minCompactSize, (long)(sumSize[start&#43;1] * r))) {
- &#43;&#43;start;
- }
- int end &#61; Math.min(countOfFiles, start &#43; this.maxFilesToCompact);
- long totalSize &#61; fileSizes[start] &#43; ((start&#43;1 < countOfFiles) ? sumSize[start&#43;1] : 0);
- compactSelection &#61; compactSelection.getSubList(start, end);
-
-
- if(majorcompaction && compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
- int pastMax &#61; compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
- compactSelection.getFilesToCompact().subList(0, pastMax).clear();
- }
- }
CompactionRequest线程(用于执行major和minor合并)
压缩相关的类图如下:
major和minor合并的差别其实很小&#xff0c;如果最后待合并的总大小 > 2*minFilesToCompact*memstoreFlushSize
则认为这次是一个major合并&#xff0c;方到major线程池中执行&#xff0c;否则认为是一次minor合并
另外在创建StoreScanner构造函数时&#xff0c;会根据ScanType来判断是major还是minor合并&#xff0c;之后在
ScanQueryMathcer中根据ScanType的不同(有用户类型&#xff0c;minor和major三种类型)来决定返回的不同值的
主要逻辑如下:
-
- CompactionRequest#run() {
- boolean completed &#61; HRegion.compact(this);
- if (completed) {
- if (s.getCompactPriority() <&#61; 0) {
- server.getCompactSplitThread().requestCompaction(r, s, "Recursive enqueue", null);
- } else {
-
- server.getCompactSplitThread().requestSplit(r);
- }
- }
- }
-
-
- HRegion#compact() {
- Preconditions.checkArgument(cr.getHRegion().equals(this));
- lock.readLock().lock();
- CompactionRequest.getStore().compact(cr);
- lock.readLock().unlock();
- }
-
-
-
- Store#compact() {
- List filesToCompact &#61; request.getFiles();
- StoreFile.Writer writer &#61; this.compactor.compact(cr, maxId);
- if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
- sf &#61; completeCompaction(filesToCompact, writer);
- }else {
-
- sf &#61; new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- sf.createReader();
- }
- }
-
-
-
-
-
-
-
- Store#completeCompaction() {
- Path origPath &#61; compactedFile.getPath();
- Path destPath &#61; new Path(homedir, origPath.getName());
- HBaseFileSystem.renameDirForFileSystem(fs, origPath, destPath);
- StoreFile result &#61; new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- passSchemaMetricsTo(result);
- result.createReader();
- }
-
-
-
-
-
-
- Compactor#compact() {
- for (StoreFile file : filesToCompact) {
- StoreFile.Reader r &#61; file.getReader();
- long keyCount &#61; (r.getBloomFilterType() &#61;&#61; store.getFamily()
- .getBloomFilterType()) ?
- r.getFilterEntries() : r.getEntries();
- maxKeyCount &#43;&#61; keyCount;
- }
-
- int compactionKVMax &#61; getConf().getInt("hbase.hstore.compaction.kv.max", 10);
- Compression.Algorithm compression &#61; store.getFamily().getCompression();
- List scanners &#61; StoreFileScanner
- .getScannersForStoreFiles(filesToCompact, false, false, true);
- Scan scan &#61; new Scan();
- scan.setMaxVersions(store.getFamily().getMaxVersions());
-
-
-
- InternalScanner scanner &#61; new StoreScanner(store, store.getScanInfo(), scan, scanne rs,majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
- smallestReadPoint, earliestPutTs);
-
- do {
- hasMore &#61; scanner.next(kvs, compactionKVMax);
- if (writer &#61;&#61; null && !kvs.isEmpty()) {
-
-
- writer &#61; store.createWriterInTmp(maxKeyCount, compactionCompression, true,
- maxMVCCReadpoint >&#61; smallestReadPoint);
- }
- for (KeyValue kv : kvs) {
- writer.append(kv);
- }
- }while(hasMore);
-
- scanner.close();
- StoreFile$Writer.appendMetadata(maxId, majorCompaction);
- StoreFile$Writer.close();
- }
压缩算法和的核心逻辑演示类图
根据由新到老排序文件&#xff0c;选择出合适的文件
这里的滑动窗口是从0下标开始过滤掉size过大的文件&#xff0c;这样可以提高合并效率
使用到的一些重要类
其中内部scan的时候使用到的相关类图如下
相关重要的类:
- Hbase在实现该算法的过程中重要的是下面这五个类。
- 1.org.apache.hadoop.hbase.regionserver.Store
- 2.org.apache.hadoop.hbase.regionserver.StoreScanner
- 3.org.apache.hadoop.hbase.regionserver.StoreFileScanner
- 4.org.apache.hadoop.hbase.regionserver.KeyValueHeap
- 5.org.apache.hadoop.hbase.regionserver.ScanQueryMatcher
-
- 这五个类的关系是
- 1.Store类调用StoreScanner的next方法&#xff0c;并循环输出kv到合并文件&#xff1b;
- 2.StoreScanner的作用是负责创建并持有多个输入文件的StoreFileScanner&#xff0c;
- 内部遍历这些StoreFileScanner并通过KeyValueHeap来排序这些输入文件的首条记录&#xff1b;
- 3.StoreFileScanner的作用是遍历单个输入文件&#xff0c;管理并提供单个输入文件的首条记录&#xff1b;
- 4.KeyValueHeap的作用就是通过堆来排序每个输入文件的首条记录。
- 5.ScanQueryMatcher的作用是当输入文件的首条记录来的时候&#xff0c;根据一定的策略判断这条记录到底是该输出还是该跳过。
StoreScanner及相关类的主要逻辑如下:
-
-
-
-
- StoreScanner#构造函数() {
- ScanQueryMatcher matcher &#61; new ScanQueryMatcher(scan, scanInfo, null, scanType,
- smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
- List extends KeyValueScanner> scanners &#61; selectScannersFrom(scanners);
- for(KeyValueScanner scanner : scanners) {
- scanner.seek(matcher.getStartKey());
- }
- KeyValueHeap heap &#61; new KeyValueHeap(scanners, store.comparator);
- }
-
-
-
- StoreFile$Writer#append() {
- appendGeneralBloomfilter(kv);
- appendDeleteFamilyBloomFilter(kv);
- HFileWriterV2.append(kv);
- trackTimestamps(kv);
- }
-
-
-
- StoreSanner#next() {
- KeyValue peeked &#61; this.heap.peek();
- if (peeked &#61;&#61; null) {
- close();
- return false;
- }
- LOOP:
- while((kv &#61; this.heap.peek()) !&#61; null) {
- ScanQueryMatcher.MatchCode qcode &#61; matcher.match(kv);
- switch(qcode) {
- case INCLUDE:
- case INCLUDE_AND_SEEK_NEXT_ROW:
- case INCLUDE_AND_SEEK_NEXT_COL:
- Filter f &#61; matcher.getFilter();
- outResult.add(f &#61;&#61; null ? kv : f.transform(kv));
- count&#43;&#43;;
- if (qcode &#61;&#61; ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
- if (!matcher.moreRowsMayExistAfter(kv)) {
- return false;
- }
- reseek(matcher.getKeyForNextRow(kv));
- } else if (qcode &#61;&#61; ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
- reseek(matcher.getKeyForNextColumn(kv));
- } else {
- this.heap.next();
- }
- cumulativeMetric &#43;&#61; kv.getLength();
- if (limit > 0 && (count &#61;&#61; limit)) {
- break LOOP;
- }
- continue;
- case DONE:
- return true;
- case DONE_SCAN:
- close();
- return false;
- case SEEK_NEXT_ROW:
- if (!matcher.moreRowsMayExistAfter(kv)) {
- return false;
- }
- reseek(matcher.getKeyForNextRow(kv));
- break;
- case SEEK_NEXT_COL:
- reseek(matcher.getKeyForNextColumn(kv));
- break;
- case SKIP:
- this.heap.next();
- break;
- case SEEK_NEXT_USING_HINT:
- KeyValue nextKV &#61; matcher.getNextKeyHint(kv);
- if (nextKV !&#61; null) {
- reseek(nextKV);
- } else {
- heap.next();
- }
- break;
- default:
- throw new RuntimeException("UNEXPECTED");
- }
- }
-
-
-
-
- KeyValueHeap#构造函数() {
- this.comparator &#61; new KVScannerComparator(comparator);
- heap &#61; new PriorityQueue(scanners.size(),
- this.comparator);
- for (KeyValueScanner scanner : scanners) {
- if (scanner.peek() !&#61; null) {
- this.heap.add(scanner);
- } else {
- scanner.close();
- }
- }
- this.current &#61; pollRealKV();
- }
-
-
-
-
- KeyValueHeap#next() {
- InternalScanner currentAsInternal &#61; (InternalScanner)this.current;
- boolean mayContainMoreRows &#61; currentAsInternal.next(result, limit, metric);
- KeyValue pee &#61; this.current.peek();
- if (pee &#61;&#61; null || !mayContainMoreRows) {
- this.current.close();
- } else {
- this.heap.add(this.current);
- }
- this.current &#61; pollRealKV();
- return (this.current !&#61; null);
- }
-
-
-
-
- ScanQueryMatcher#构造函数() {
-
-
- this.isUserScan &#61; scanType &#61;&#61; ScanType.USER_SCAN;
- this.retainDeletesInOutput &#61; scanType &#61;&#61; ScanType.MINOR_COMPACT || scan.isRaw();
-
- }
HRegionServer调用split请求
执行逻辑如下:
-
- HRegionServer#splitRegion() {
- HRegion region &#61; getRegion(regionInfo.getRegionName());
- region.flushcache();
- region.forceSplit(splitPoint);
- compactSplitThread.requestSplit(region, region.checkSplit());
- }
-
-
- CompactSplitThread#requestSplit() {
- ThreadPoolExecutor#execute(new SplitRequest(r, midKey, HRegionServer.this));
- }
split线程执行过程
META表更新的瞬间
主要逻辑如下:
-
- SplitRequest#run() {
- SplitTransaction st &#61; new SplitTransaction(parent, midKey);
- if (!st.prepare()) {
- return;
- }
- st.execute(this.server, this.server);
- }
-
-
-
-
-
-
- SplitTransaction#execute() {
- PairOfSameType regions &#61; createDaughters(server, services);
- openDaughters(server, services, regions.getFirst(), regions.getSecond());
- transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
- }
-
-
-
- SplitTransaction#prepare() {
- HRegionInfo hri &#61; parent.getRegionInfo();
- hri_a &#61; new HRegionInfo(hri.getTableName(), startKey, splitrow, false, rid);
- hri_b &#61; new HRegionInfo(hri.getTableName(), splitrow, endKey, false, rid);
- }
-
- SplitTransaction#createDaughters() {
-
-
- createNodeSplitting();
-
- this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
-
-
- transitionNodeSplitting();
-
-
-
- createSplitDir();
-
-
- List hstoreFilesToSplit &#61; this.parent.close(false);
- HRegionServer.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
- splitStoreFiles(this.splitdir, hstoreFilesToSplit);
- this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
- HRegion a &#61; createDaughterRegion(this.hri_a, this.parent.rsServices);
- this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
- HRegion b &#61; createDaughterRegion(this.hri_b, this.parent.rsServices);
-
-
- MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
- this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
-
-
- return new PairOfSameType(a, b);
- }
-
- SplitTransaction#splitStoreFiles() {
- for (StoreFile sf: hstoreFilesToSplit) {
-
- StoreFileSplitter sfs &#61; new StoreFileSplitter(sf, splitdir);
- futures.add(threadPool.submit(sfs));
- }
-
- }
-
-
- SplitTransaction$StoreFileSplitter#call() {
- splitStoreFile(sf, splitdir);
- }
-
- SplitTransaction#splitStoreFile() {
- FileSystem fs &#61; this.parent.getFilesystem();
- byte [] family &#61; sf.getFamily();
- String encoded &#61; this.hri_a.getEncodedName();
-
-
-
- Path storedir &#61; Store.getStoreHomedir(splitdir, encoded, family);
-
-
-
- StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);
- encoded &#61; this.hri_b.getEncodedName();
- storedir &#61; Store.getStoreHomedir(splitdir, encoded, family);
- StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);
- }
-
-
-
-
-
- StoreFile#split() {
- if (range &#61;&#61; Reference.Range.bottom) {
- KeyValue splitKey &#61; KeyValue.createLastOnRow(splitRow);
- byte[] firstKey &#61; f.createReader().getFirstKey();
- if (f.getReader().getComparator().compare(splitKey.getBuffer(),
- splitKey.getKeyOffset(), splitKey.getKeyLength(),
- firstKey, 0, firstKey.length) < 0) {
- return null;
- }
- } else {
- KeyValue splitKey &#61; KeyValue.createFirstOnRow(splitRow);
- byte[] lastKey &#61; f.createReader().getLastKey();
- if (f.getReader().getComparator().compare(splitKey.getBuffer(),
- splitKey.getKeyOffset(), splitKey.getKeyLength(),
- lastKey, 0, lastKey.length) > 0) {
- return null;
- }
- }
- Reference r &#61; new Reference(splitRow, range);
- String parentRegionName &#61; f.getPath().getParent().getParent().getName();
- Path p &#61; new Path(splitDir, f.getPath().getName() &#43; "." &#43; parentRegionName);
- return r.write(fs, p);
- }
-
-
- SplitTransaction#createDaughterRegion() {
- FileSystem fs &#61; this.parent.getFilesystem();
- Path regionDir &#61; getSplitDirForDaughter(this.parent.getFilesystem(),
- this.splitdir, hri);
- HRegion r &#61; HRegion.newHRegion(this.parent.getTableDir(),
- this.parent.getLog(), fs, this.parent.getBaseConf(),
- hri, this.parent.getTableDesc(), rsServices);
- long halfParentReadRequestCount &#61; this.parent.getReadRequestsCount() / 2;
- r.readRequestsCount.set(halfParentReadRequestCount);
- r.setOpMetricsReadRequestCount(halfParentReadRequestCount);
- long halfParentWriteRequest &#61; this.parent.getWriteRequestsCount() / 2;
- r.writeRequestsCount.set(halfParentWriteRequest);
- r.setOpMetricsWriteRequestCount(halfParentWriteRequest);
- HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
- return r;
- }
-
-
-
- MetaEditor#offlineParentInMeta() {
- HRegionInfo copyOfParent &#61; new HRegionInfo(parent);
- copyOfParent.setOffline(true);
- copyOfParent.setSplit(true);
- Put put &#61; new Put(copyOfParent.getRegionName());
- addRegionInfo(put, copyOfParent);
- put.add("info", "splitA",Writables.getBytes(a));
- put.add("info", "splitB",Writables.getBytes(b));
- putToMetaTable(catalogTracker, put);
- }
-
-
-
-
-
-
-
- SplitTransaction#openDaughters() {
- DaughterOpener aOpener &#61; new DaughterOpener(server, a);
- DaughterOpener bOpener &#61; new DaughterOpener(server, b);
- aOpener.start();
- bOpener.start();
- aOpener.join();
- bOpener.join();
-
- HRegionServer.postOpenDeployTasks(b, server.getCatalogTracker(), true);
-
- HRegionServer.addToOnlineRegions(b);
- HRegionServer.postOpenDeployTasks(a, server.getCatalogTracker(), true);
- HRegionServer.addToOnlineRegions(a);
- }
-
-
-
- HRegionServer#postOpenDeployTasks() {
- for (Store s : r.getStores().values()) {
- if (s.hasReferences() || s.needsCompaction()) {
- getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
- }
- }
-
- if (r.getRegionInfo().isRootRegion()) {
- RootLocationEditor.setRootLocation(getZooKeeper(),
- this.serverNameFromMasterPOV);
- } else if (r.getRegionInfo().isMetaRegion()) {
- MetaEditor.updateMetaLocation(ct, r.getRegionInfo(),
- this.serverNameFromMasterPOV);
- } else {
- if (daughter) {
-
- MetaEditor.addDaughter(ct, r.getRegionInfo(),
- this.serverNameFromMasterPOV);
- } else {
- MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
- this.serverNameFromMasterPOV);
- }
- }
- }
-
-
-
-
- SplitTransaction#transitionZKNode() {
- transitionNodeSplit();
- tickleNodeSplit();
- }
一些辅助逻辑:
-
-
- HRegion#close() {
- waitForFlushesAndCompactions();
- internalFlushcache();
- ThreadPoolExecutor storeCloserThreadPool &#61;
- getStoreOpenAndCloseThreadPool("StoreCloserThread-"
- &#43; this.regionInfo.getRegionNameAsString());
- CompletionService> completionService &#61;
- new ExecutorCompletionService>(
- storeCloserThreadPool);
-
- for (final Store store : stores.values()) {
- completionService.submit(new Callable>() {
- public ImmutableList call() throws IOException {
- return store.close();
- }
- });
- }
- }
-
-
- Store#close() {
- for (final StoreFile f : result) {
- completionService.submit(new Callable() {
- public Void call() throws IOException {
- f.closeReader(true);
- return null;
- }
- }
- }
compactionChecker线程
这个类是用于定期检查region server下的region是否需要做compact
主要逻辑如下:
-
-
- CompactionChecker#chore() {
- for (HRegion r : this.instance.onlineRegions.values()) {
- for (Store s : r.getStores().values()) {
- if (s.needsCompaction()) {
-
- this.instance.compactSplitThread.requestCompaction(r, s, getName());
- } else if (s.isMajorCompaction()) {
- if (majorCompactPriority &#61;&#61; DEFAULT_PRIORITY
- || majorCompactPriority > r.getCompactPriority()) {
- this.instance.compactSplitThread.requestCompaction(r, s, getName());
- } else {
- this.instance.compactSplitThread.requestCompaction(r, s, getName());
- }
- }
- }
- }
- }
参考
深入分析HBase Compaction机制
Hbase的Region Compact算法实现分析
深入分析HBase RPC(Protobuf)实现机制
HBase region split源码分析