热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

HBase压缩和分割原理

HRegionServer调用合并请求主要逻辑如下:遍历每个Store然后计算需要合并的文件,生成CompactionRequest对象并提交到线程池中执行根据thr

HRegionServer调用合并请求

主要逻辑如下:

  1. //遍历每个Store然后计算需要合并的文件,生成  
  2. //CompactionRequest对象并提交到线程池中执行  
  3. //根据throttleCompaction()函数规则来判断是提交到  
  4. //largeCompactions线程池还是smallCompactions线程池  
  5. CompactSplitThread#requestCompaction() {  
  6.     for (Store s : r.getStores().values()) {  
  7.         CompactionRequest cr = Store.requestCompaction(priority, request);  
  8.         ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())  
  9.           ? largeCompactions : smallCompactions;  
  10.         pool.execute(cr);         
  11.         ret.add(cr);  
  12.     }     
  13. }  
  14.   
  15. //如果CompactionRequest的总大小 >  
  16. //minFilesToCompact * 2 * memstoreFlushSize  
  17. //则这次任务为major合并,否则在为minor合并  
  18. Store#throttleCompaction() {  
  19.     long throttlePoint = conf.getLong(  
  20.         "hbase.regionserver.thread.compaction.throttle",  
  21.         2 * this.minFilesToCompact * this.region.memstoreFlushSize);  
  22.     return compactionSize > throttlePoint;         
  23. }  
  24.   
  25.   
  26. Store#compactSelection() {  
  27.     //选择出已经过期的StoreFile  
  28.     if(storefile.maxTimeStamp &#43; store.ttl < now_timestamp) {  
  29.         //返回已经过期的store file文件集合   
  30.     }  
  31.       
  32.     //从0开始遍历到最后&#xff0c;如果发现有文件 > maxCompactSize则pos&#43;&#43;  
  33.     //然后过滤掉这些大于maxCompactSize的文件  
  34.     while (pos < compactSelection.getFilesToCompact().size() &&  
  35.              compactSelection.getFilesToCompact().get(pos).getReader().length()  
  36.                > while (pos < compactSelection.getFilesToCompact().size() &&  
  37.              compactSelection.getFilesToCompact().get(pos).getReader().length()  
  38.                > maxCompactSize &&  
  39.              !compactSelection.getFilesToCompact().get(pos).isReference()) &#43;&#43;pos;  
  40.       if (pos !&#61; 0) compactSelection.clearSubList(0, pos); &&  
  41.              !compactSelection.getFilesToCompact().get(pos).isReference()) {  
  42.         &#43;&#43;pos;  
  43.     }  
  44.     if (pos !&#61; 0) {           
  45.         compactSelection.clearSubList(0, pos);  
  46.     }        
  47.     if (compactSelection.getFilesToCompact().size() < minFilesToCompact) {  
  48.         return;   
  49.     }  
  50.       
  51.     //计算出sumSize数组&#xff0c;数组大小就是Store中的文件数量  
  52.     //sumSize数组中每个元素的大小是根据StroeFile的大小再加上 sumSize[i&#43;1](或者0)  
  53.     //然后减去fileSizes[tooFar](或者0)  
  54.     //sumSize的内容跟元素的fileSizes数组应该差别不大  
  55.     int countOfFiles &#61; compactSelection.getFilesToCompact().size();  
  56.     long [] fileSizes &#61; new long[countOfFiles];  
  57.     long [] sumSize &#61; new long[countOfFiles];  
  58.     for (int i &#61; countOfFiles-1; i >&#61; 0; --i) {  
  59.         StoreFile file &#61; compactSelection.getFilesToCompact().get(i);  
  60.         fileSizes[i] &#61; file.getReader().length();  
  61.         // calculate the sum of fileSizes[i,i&#43;maxFilesToCompact-1) for algo  
  62.         int tooFar &#61; i &#43; this.maxFilesToCompact - 1;  
  63.         sumSize[i] &#61; fileSizes[i] &#43; ((i&#43;1    < countOfFiles) ? sumSize[i&#43;1]      : 0)  
  64.             - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);  
  65.     }  
  66.       
  67.     //如果fileSize[start] > Math.max(minCompactSize,sumSize[start&#43;1] * r)  
  68.     //则下标&#43;&#43;&#xff0c;这里的操作是过滤掉过大的文件&#xff0c;以免影响合并时间  
  69.     while(countOfFiles - start >&#61; this.minFilesToCompact && fileSizes[start] >  
  70.         Math.max(minCompactSize, (long)(sumSize[start&#43;1] * r))) {  
  71.         &#43;&#43;start;  
  72.     }  
  73.     int end &#61; Math.min(countOfFiles, start &#43; this.maxFilesToCompact);  
  74.     long totalSize &#61; fileSizes[start] &#43; ((start&#43;1 < countOfFiles) ? sumSize[start&#43;1] : 0);  
  75.         compactSelection &#61; compactSelection.getSubList(start, end);  
  76.           
  77.     //如果是major compact&#xff0c;并且需要执行的文件数量过多&#xff0c;则去掉一些    
  78.     if(majorcompaction && compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {  
  79.         int pastMax &#61; compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;  
  80.         compactSelection.getFilesToCompact().subList(0, pastMax).clear();         
  81.     }         
  82. }  

 

 

 

 

 

CompactionRequest线程(用于执行major和minor合并)

压缩相关的类图如下:



major和minor合并的差别其实很小&#xff0c;如果最后待合并的总大小 > 2*minFilesToCompact*memstoreFlushSize

则认为这次是一个major合并&#xff0c;方到major线程池中执行&#xff0c;否则认为是一次minor合并

另外在创建StoreScanner构造函数时&#xff0c;会根据ScanType来判断是major还是minor合并&#xff0c;之后在

ScanQueryMathcer中根据ScanType的不同(有用户类型&#xff0c;minor和major三种类型)来决定返回的不同值的

主要逻辑如下:

  1. //在单独的线程中执行合并  
  2. CompactionRequest#run() {  
  3.     boolean completed &#61; HRegion.compact(this);  
  4.     if (completed) {  
  5.         if (s.getCompactPriority() <&#61; 0) {  
  6.             server.getCompactSplitThread().requestCompaction(r, s, "Recursive enqueue"null);  
  7.         } else {  
  8.             // see if the compaction has caused us to exceed max region size  
  9.             server.getCompactSplitThread().requestSplit(r);  
  10.         }         
  11.     }  
  12. }  
  13.   
  14. //这里会调用Store&#xff0c;来执行compact  
  15. HRegion#compact() {  
  16.     Preconditions.checkArgument(cr.getHRegion().equals(this));  
  17.     lock.readLock().lock();  
  18.     CompactionRequest.getStore().compact(cr);  
  19.     lock.readLock().unlock();         
  20. }  
  21.   
  22. //完成合并&#xff0c;调用Compactor#compact()完成最核心的compact逻辑  
  23. //将合并后的文件移动到最终目录下并删除掉旧的文件  
  24. Store#compact() {  
  25.     List filesToCompact &#61; request.getFiles();  
  26.     StoreFile.Writer writer &#61; this.compactor.compact(cr, maxId);  
  27.     if (this.conf.getBoolean("hbase.hstore.compaction.complete"true)) {  
  28.         sf &#61; completeCompaction(filesToCompact, writer);      
  29.     }else {  
  30.         // Create storefile around what we wrote with a reader on it.  
  31.         sf &#61; new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,  
  32.           this.family.getBloomFilterType(), this.dataBlockEncoder);  
  33.         sf.createReader();  
  34.     }  
  35. }  
  36.   
  37. //将 /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/.tmp/9c8614a6bd0d4833b419a13abfde5ac1  
  38. //移动到  
  39. // /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/value/9c8614a6bd0d4833b419a13abfde5ac1  
  40. //再对新的目标文件创建一个StroeFile对象包装  
  41. //将旧的文件(这些底层的HFile都已经合并成一个文件了)删除  
  42. //最后计算新的StoreFile文件大小等信息并返回  
  43. Store#completeCompaction() {  
  44.     Path origPath &#61; compactedFile.getPath();  
  45.     Path destPath &#61; new Path(homedir, origPath.getName());        
  46.     HBaseFileSystem.renameDirForFileSystem(fs, origPath, destPath);  
  47.     StoreFile result &#61; new StoreFile(this.fs, destPath, this.conf, this.cacheConf,  
  48.           this.family.getBloomFilterType(), this.dataBlockEncoder);  
  49.     passSchemaMetricsTo(result);  
  50.     result.createReader();    
  51. }  
  52.   
  53.   
  54. //compact的最核心逻辑!!  
  55. //对多个StoreFile进行合并&#xff0c;这里使用到了StoreScanner  
  56. //迭代读取所有的StroeFile然后使用堆排序输出&#xff0c;并写入到  
  57. //StoreFile$Writer#append()中  
  58. Compactor#compact() {  
  59.     for (StoreFile file : filesToCompact) {  
  60.         StoreFile.Reader r &#61; file.getReader();    
  61.         long keyCount &#61; (r.getBloomFilterType() &#61;&#61; store.getFamily()  
  62.           .getBloomFilterType()) ?  
  63.           r.getFilterEntries() : r.getEntries();      
  64.         maxKeyCount &#43;&#61; keyCount;              
  65.     }  
  66.       
  67.     int compactionKVMax &#61; getConf().getInt("hbase.hstore.compaction.kv.max"10);  
  68.     Compression.Algorithm compression &#61; store.getFamily().getCompression();  
  69.     List scanners &#61; StoreFileScanner  
  70.       .getScannersForStoreFiles(filesToCompact, falsefalsetrue);  
  71.     Scan scan &#61; new Scan();  
  72.     scan.setMaxVersions(store.getFamily().getMaxVersions());       
  73.       
  74.         //这里会根据当前合并的类型选择ScanType的类型&#xff0c;之后ScanQueryMatcher根据ScanType的  
  75.         //的类型返回不同的值  
  76.         InternalScanner scanner &#61; new StoreScanner(store, store.getScanInfo(), scan, scanne        rs,majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,  
  77.         smallestReadPoint, earliestPutTs);        
  78.           
  79.     do {  
  80.         hasMore &#61; scanner.next(kvs, compactionKVMax);  
  81.         if (writer &#61;&#61; null && !kvs.isEmpty()) {  
  82.             //在tmp目录下创建一个临时文件&#xff0c;路径类似  
  83.             // /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/.tmp/9c8614a6bd0d4833b419a13abfde5ac1  
  84.             writer &#61; store.createWriterInTmp(maxKeyCount, compactionCompression, true,  
  85.               maxMVCCReadpoint >&#61; smallestReadPoint);  
  86.         }  
  87.         for (KeyValue kv : kvs) {  
  88.             writer.append(kv);    
  89.         }         
  90.     }while(hasMore);  
  91.       
  92.     scanner.close();  
  93.     StoreFile$Writer.appendMetadata(maxId, majorCompaction);  
  94.     StoreFile$Writer.close();     
  95. }  

 

压缩算法和的核心逻辑演示类图

根据由新到老排序文件&#xff0c;选择出合适的文件

这里的滑动窗口是从0下标开始过滤掉size过大的文件&#xff0c;这样可以提高合并效率


 

 

 

 

 

使用到的一些重要类

其中内部scan的时候使用到的相关类图如下


相关重要的类:

  1. Hbase在实现该算法的过程中重要的是下面这五个类。   
  2. 1.org.apache.hadoop.hbase.regionserver.Store   
  3. 2.org.apache.hadoop.hbase.regionserver.StoreScanner   
  4. 3.org.apache.hadoop.hbase.regionserver.StoreFileScanner   
  5. 4.org.apache.hadoop.hbase.regionserver.KeyValueHeap   
  6. 5.org.apache.hadoop.hbase.regionserver.ScanQueryMatcher   
  7.   
  8. 这五个类的关系是   
  9. 1.Store类调用StoreScanner的next方法&#xff0c;并循环输出kv到合并文件&#xff1b;   
  10. 2.StoreScanner的作用是负责创建并持有多个输入文件的StoreFileScanner&#xff0c;  
  11.     内部遍历这些StoreFileScanner并通过KeyValueHeap来排序这些输入文件的首条记录&#xff1b;   
  12. 3.StoreFileScanner的作用是遍历单个输入文件&#xff0c;管理并提供单个输入文件的首条记录&#xff1b;   
  13. 4.KeyValueHeap的作用就是通过堆来排序每个输入文件的首条记录。   
  14. 5.ScanQueryMatcher的作用是当输入文件的首条记录来的时候&#xff0c;根据一定的策略判断这条记录到底是该输出还是该跳过。   

 

StoreScanner及相关类的主要逻辑如下:

  1. //内部应用StoreFileScanner列表&#xff0c;创建ScanQueryMatcher用来判断是过滤还是输出  
  2. //创建KeyValueHeap用于堆排序&#xff0c;根据堆的结构每次从堆顶拿出一个  
  3. //注意这个构造函数中有一个参数ScanType&#xff0c;是扫描的类型&#xff0c;包括MAJOR_COMPACT&#xff0c;MINOR_COMPACT&#xff0c;  
  4. //USER_COMPACT来返回不同的值&#xff0c;以达到major或minor的效果  
  5. StoreScanner#构造函数() {  
  6.     ScanQueryMatcher matcher &#61; new ScanQueryMatcher(scan, scanInfo, null, scanType,  
  7.         smallestReadPoint, earliestPutTs, oldestUnexpiredTS);     
  8.     Listextends KeyValueScanner> scanners &#61; selectScannersFrom(scanners);  
  9.     for(KeyValueScanner scanner : scanners) {  
  10.         scanner.seek(matcher.getStartKey());  
  11.     }     
  12.     KeyValueHeap heap &#61; new KeyValueHeap(scanners, store.comparator);       
  13. }  
  14.   
  15. //选择性的创建布隆过滤器&#xff0c;调用HFileWriterv2的append()  
  16. //写入KeyValue信息  
  17. StoreFile$Writer#append() {  
  18.       appendGeneralBloomfilter(kv);  
  19.       appendDeleteFamilyBloomFilter(kv);  
  20.       HFileWriterV2.append(kv);  
  21.       trackTimestamps(kv);    
  22. }  
  23.   
  24. //这个方法封装了处理heap取出的记录值的逻辑&#xff0c;  
  25. //根据matcher对该值的判断来决定这个值是输出还是跳过  
  26. StoreSanner#next() {  
  27.  KeyValue peeked &#61; this.heap.peek();  
  28.     if (peeked &#61;&#61; null) {  
  29.       close();  
  30.       return false;  
  31.     }     
  32.     LOOP:   
  33.     while((kv &#61; this.heap.peek()) !&#61; null) {      
  34.         ScanQueryMatcher.MatchCode qcode &#61; matcher.match(kv);  
  35.             switch(qcode) {  
  36.                 case INCLUDE:  
  37.                 case INCLUDE_AND_SEEK_NEXT_ROW:  
  38.                 case INCLUDE_AND_SEEK_NEXT_COL:  
  39.                     Filter f &#61; matcher.getFilter();  
  40.                     outResult.add(f &#61;&#61; null ? kv : f.transform(kv));  
  41.                     count&#43;&#43;;  
  42.                     if (qcode &#61;&#61; ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {  
  43.                         if (!matcher.moreRowsMayExistAfter(kv)) {  
  44.                             return false;  
  45.                         }  
  46.                         reseek(matcher.getKeyForNextRow(kv));  
  47.                     } else if (qcode &#61;&#61; ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {  
  48.                         reseek(matcher.getKeyForNextColumn(kv));  
  49.                     } else {  
  50.                         this.heap.next();  
  51.                     }                                         
  52.                     cumulativeMetric &#43;&#61; kv.getLength();  
  53.                     if (limit > 0 && (count &#61;&#61; limit)) {  
  54.                         break LOOP;  
  55.                     }  
  56.                     continue;  
  57.                 case DONE:  
  58.                     return true;  
  59.                 case DONE_SCAN:  
  60.                     close();  
  61.                     return false;         
  62.                 case SEEK_NEXT_ROW:    
  63.                     if (!matcher.moreRowsMayExistAfter(kv)) {  
  64.                         return false;  
  65.                     }  
  66.                     reseek(matcher.getKeyForNextRow(kv));  
  67.                     break;    
  68.                 case SEEK_NEXT_COL:  
  69.                     reseek(matcher.getKeyForNextColumn(kv));  
  70.                     break;  
  71.                 case SKIP:  
  72.                     this.heap.next();  
  73.                     break;  
  74.                 case SEEK_NEXT_USING_HINT:  
  75.                     KeyValue nextKV &#61; matcher.getNextKeyHint(kv);  
  76.                     if (nextKV !&#61; null) {  
  77.                         reseek(nextKV);  
  78.                     } else {  
  79.                         heap.next();  
  80.                     }  
  81.                     break;  
  82.                 default:  
  83.                     throw new RuntimeException("UNEXPECTED");                 
  84.     }//end while  
  85. }  
  86.   
  87. //KeyValueHeap使用堆排序输出结果  
  88. //内部使用了优先队列&#xff0c;再用KVScannerComparator  
  89. //作为比较工具  
  90. KeyValueHeap#构造函数() {  
  91.     this.comparator &#61; new KVScannerComparator(comparator);  
  92.     heap &#61; new PriorityQueue(scanners.size(),  
  93.           this.comparator);  
  94.     for (KeyValueScanner scanner : scanners) {  
  95.         if (scanner.peek() !&#61; null) {  
  96.             this.heap.add(scanner);  
  97.         } else {  
  98.             scanner.close();  
  99.         }  
  100.     }  
  101.     this.current &#61; pollRealKV();            
  102. }  
  103.   
  104. //堆里面最重要的方法其实就是next&#xff0c;不过看这个方法的主要功能不是  
  105. //为了算出nextKeyValue&#xff0c;而主要是为了算出nextScanner&#xff0c;然后需在外部  
  106. //再次调用peek方法来取得nextKeyValue&#xff0c;不是很简练。  
  107. KeyValueHeap#next() {  
  108.     InternalScanner currentAsInternal &#61; (InternalScanner)this.current;  
  109.     boolean mayContainMoreRows &#61; currentAsInternal.next(result, limit, metric);  
  110.     KeyValue pee &#61; this.current.peek();  
  111.     if (pee &#61;&#61; null || !mayContainMoreRows) {  
  112.         this.current.close();  
  113.     } else {  
  114.         this.heap.add(this.current);  
  115.     }  
  116.     this.current &#61; pollRealKV();  
  117.     return (this.current !&#61; null);      
  118. }  
  119.   
  120. //这里省略了其他部分&#xff0c;注意这里有两个赋值  
  121. //对于compact来说如果是minor类型的则不会删除掉DELETE类型的KeyValue  
  122. //而major类型在最终输出的时候会删除掉DELETE类型的KeyValue标记  
  123. ScanQueryMatcher#构造函数() {  
  124.     //.....  
  125.     /* how to deal with deletes */  
  126.     this.isUserScan &#61; scanType &#61;&#61; ScanType.USER_SCAN;  
  127.     this.retainDeletesInOutput &#61; scanType &#61;&#61; ScanType.MINOR_COMPACT || scan.isRaw();      
  128.     //..  
  129. }  

 

 

 

 

 

HRegionServer调用split请求


执行逻辑如下:

  1. //切分region  
  2. HRegionServer#splitRegion() {  
  3.     HRegion region &#61; getRegion(regionInfo.getRegionName());  
  4.     region.flushcache();  
  5.     region.forceSplit(splitPoint);  
  6.     compactSplitThread.requestSplit(region, region.checkSplit());         
  7. }  
  8.   
  9. //创建SplitRequest对象&#xff0c;放到线程池中执行  
  10. CompactSplitThread#requestSplit() {  
  11.     ThreadPoolExecutor#execute(new SplitRequest(r, midKey, HRegionServer.this));      
  12. }  

  

 

 

 

 

split线程执行过程


 

META表更新的瞬间

主要逻辑如下:

  1. //在单线中执行  
  2. SplitRequest#run() {  
  3.     SplitTransaction st &#61; new SplitTransaction(parent, midKey);  
  4.     if (!st.prepare()) {  
  5.         return;   
  6.     }  
  7.     st.execute(this.server, this.server);  
  8. }  
  9.   
  10. //核心逻辑&#xff0c;先创建两个子region&#xff0c;再创建临时的ZK节点  
  11. //将父region切分&#xff0c;创建临时目录&#xff0c;将region关闭  
  12. //开始切分&#xff0c;将storefile放到目录中  
  13. //创建子regionA和B&#xff0c;同时open这两个region&#xff0c;更新META信息  
  14. //更新ZK信息&#xff0c;将原region下线  
  15. SplitTransaction#execute() {  
  16.     PairOfSameType regions &#61; createDaughters(server, services);  
  17.     openDaughters(server, services, regions.getFirst(), regions.getSecond());  
  18.     transitionZKNode(server, services, regions.getFirst(), regions.getSecond());          
  19. }  
  20.   
  21.   
  22. //预先创建两个子region  
  23. SplitTransaction#prepare() {  
  24.     HRegionInfo hri &#61; parent.getRegionInfo();  
  25.     hri_a &#61; new HRegionInfo(hri.getTableName(), startKey, splitrow, false, rid);  
  26.     hri_b &#61; new HRegionInfo(hri.getTableName(), splitrow, endKey, false, rid);  
  27. }  
  28.   
  29. SplitTransaction#createDaughters() {  
  30.     //创建类似 /hbase/unassigned/fad11edf1e6e0a842b7fd3ad87f25053  
  31.     //这样的节点&#xff0c;其中的编码后的region就是待split的region  
  32.     createNodeSplitting();  
  33.     //用于记录事务的处理进展  
  34.     this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);  
  35.       
  36.     //将这个节点作为事务节点&#xff0c;待任务处理完后会删除这个节点  
  37.     transitionNodeSplitting();  
  38.       
  39.     //创建类似 /hbase/kvdb/fad11edf1e6e0a842b7fd3ad87f25053/.splits  
  40.     //的HDFS节点&#xff0c;用于临时处理split文件  
  41.     createSplitDir();     
  42.       
  43.     //关闭待处理的region  
  44.     List hstoreFilesToSplit &#61; this.parent.close(false);  
  45.     HRegionServer.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());  
  46.     splitStoreFiles(this.splitdir, hstoreFilesToSplit);  
  47.     this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);  
  48.     HRegion a &#61; createDaughterRegion(this.hri_a, this.parent.rsServices);  
  49.     this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);  
  50.     HRegion b &#61; createDaughterRegion(this.hri_b, this.parent.rsServices);  
  51.       
  52.     //更新META表信息  
  53.     MetaEditor.offlineParentInMeta(server.getCatalogTracker(),  
  54.         this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());  
  55.               
  56.     //返回两个子region A和B  
  57.     return new PairOfSameType(a, b);  
  58. }  
  59.   
  60. SplitTransaction#splitStoreFiles() {  
  61.     for (StoreFile sf: hstoreFilesToSplit) {  
  62.         //splitStoreFile(sf, splitdir);  
  63.         StoreFileSplitter sfs &#61; new StoreFileSplitter(sf, splitdir);  
  64.         futures.add(threadPool.submit(sfs));  
  65.     }     
  66.     //等待线程池中的任务执行完后返回  
  67. }  
  68.   
  69. //开始分割文件  
  70. SplitTransaction$StoreFileSplitter#call() {  
  71.     splitStoreFile(sf, splitdir);  
  72. }  
  73.   
  74. SplitTransaction#splitStoreFile() {  
  75.     FileSystem fs &#61; this.parent.getFilesystem();  
  76.     byte [] family &#61; sf.getFamily();  
  77.     String encoded &#61; this.hri_a.getEncodedName();      
  78.     //地址类似  
  79.     // /hbase/kvdb/fad11edf1e6e0a842b7fd3ad87f25053/.splits/1977310abc183fac9aba3dc626b01a2d  
  80.     //    /value/92e897822d804d3bb4805548e9a80bd2.fad11edf1e6e0a842b7fd3ad87f25053  
  81.     Path storedir &#61; Store.getStoreHomedir(splitdir, encoded, family);      
  82.     //这里会根据splitRow分别创建两个文件&#xff0c;一个是从最开始到splitRow  
  83.     //还有一个是从splitRow到文件最后  
  84.     //这里是直接调用HDFS的API写入到底层文件系统中的  
  85.     StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);  
  86.     encoded &#61; this.hri_b.getEncodedName();  
  87.     storedir &#61; Store.getStoreHomedir(splitdir, encoded, family);  
  88.     StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);          
  89. }  
  90.   
  91. //这里会根据传入的参数&#xff0c;是从开始到splitRow  
  92. //还是从splitRow到文件结束  
  93. //如果是从开始到splitRow&#xff0c;那么判断第一个key如果splitRow大则这个  
  94. //文件就不需要分割了&#xff0c;直接返回即可  
  95. StoreFile#split() {  
  96.     if (range &#61;&#61; Reference.Range.bottom) {  
  97.         KeyValue splitKey &#61; KeyValue.createLastOnRow(splitRow);  
  98.         byte[] firstKey &#61; f.createReader().getFirstKey();  
  99.         if (f.getReader().getComparator().compare(splitKey.getBuffer(),   
  100.             splitKey.getKeyOffset(), splitKey.getKeyLength(),   
  101.             firstKey, 0, firstKey.length) < 0) {  
  102.             return null;  
  103.         }             
  104.     } else {  
  105.         KeyValue splitKey &#61; KeyValue.createFirstOnRow(splitRow);  
  106.         byte[] lastKey &#61; f.createReader().getLastKey();        
  107.         if (f.getReader().getComparator().compare(splitKey.getBuffer(),   
  108.             splitKey.getKeyOffset(), splitKey.getKeyLength(),   
  109.             lastKey, 0, lastKey.length) > 0) {  
  110.             return null;  
  111.         }         
  112.     }     
  113.     Reference r &#61; new Reference(splitRow, range);  
  114.     String parentRegionName &#61; f.getPath().getParent().getParent().getName();  
  115.     Path p &#61; new Path(splitDir, f.getPath().getName() &#43; "." &#43; parentRegionName);  
  116.     return r.write(fs, p);  
  117. }  
  118.   
  119. //创建一个HRegion  
  120. SplitTransaction#createDaughterRegion() {  
  121.     FileSystem fs &#61; this.parent.getFilesystem();  
  122.     Path regionDir &#61; getSplitDirForDaughter(this.parent.getFilesystem(),  
  123.       this.splitdir, hri);  
  124.     HRegion r &#61; HRegion.newHRegion(this.parent.getTableDir(),  
  125.       this.parent.getLog(), fs, this.parent.getBaseConf(),  
  126.       hri, this.parent.getTableDesc(), rsServices);  
  127.     long halfParentReadRequestCount &#61; this.parent.getReadRequestsCount() / 2;  
  128.     r.readRequestsCount.set(halfParentReadRequestCount);  
  129.     r.setOpMetricsReadRequestCount(halfParentReadRequestCount);  
  130.     long halfParentWriteRequest &#61; this.parent.getWriteRequestsCount() / 2;  
  131.     r.writeRequestsCount.set(halfParentWriteRequest);  
  132.     r.setOpMetricsWriteRequestCount(halfParentWriteRequest);      
  133.     HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());  
  134.     return r;     
  135. }  
  136.   
  137. //设置region的info:regioninfo列为下线状态  
  138. //再增加两个列info:splitA和info:splitB  
  139. MetaEditor#offlineParentInMeta() {  
  140.     HRegionInfo copyOfParent &#61; new HRegionInfo(parent);  
  141.     copyOfParent.setOffline(true);  
  142.     copyOfParent.setSplit(true);  
  143.     Put put &#61; new Put(copyOfParent.getRegionName());  
  144.     addRegionInfo(put, copyOfParent);  
  145.     put.add("info""splitA",Writables.getBytes(a));  
  146.     put.add("info""splitB",Writables.getBytes(b));  
  147.     putToMetaTable(catalogTracker, put);      
  148. }  
  149.   
  150.   
  151.   
  152. //这里的DaughterOpener是对HRegion的封装  
  153. //会在新线程中启动HRegion#open()  
  154. //之后会更新META表信息&#xff0c;之后META表在很短的时间内  
  155. //会同时存在父region信息(已下线)和两个子region信息  
  156. SplitTransaction#openDaughters() {  
  157.     DaughterOpener aOpener &#61; new DaughterOpener(server, a);  
  158.     DaughterOpener bOpener &#61; new DaughterOpener(server, b);  
  159.     aOpener.start();  
  160.     bOpener.start();  
  161.     aOpener.join();  
  162.     bOpener.join();      
  163.       
  164.     HRegionServer.postOpenDeployTasks(b, server.getCatalogTracker(), true);  
  165.     // Should add it to OnlineRegions  
  166.     HRegionServer.addToOnlineRegions(b);  
  167.     HRegionServer.postOpenDeployTasks(a, server.getCatalogTracker(), true);  
  168.     HRegionServer.addToOnlineRegions(a);          
  169. }  
  170.   
  171. //如果StoreFile超过一定数量了会执行compact  
  172. //然后更新ZK或者ROOT和META表  
  173. HRegionServer#postOpenDeployTasks() {  
  174.     for (Store s : r.getStores().values()) {  
  175.         if (s.hasReferences() || s.needsCompaction()) {  
  176.             getCompactionRequester().requestCompaction(r, s, "Opening Region"null);  
  177.         }  
  178.     }  
  179.     //更新ZK或者ROOT和META表  
  180.     if (r.getRegionInfo().isRootRegion()) {  
  181.         RootLocationEditor.setRootLocation(getZooKeeper(),  
  182.         this.serverNameFromMasterPOV);  
  183.     } else if (r.getRegionInfo().isMetaRegion()) {  
  184.         MetaEditor.updateMetaLocation(ct, r.getRegionInfo(),  
  185.         this.serverNameFromMasterPOV);  
  186.     } else {  
  187.         if (daughter) {  
  188.             // If daughter of a split, update whole row, not just location.  
  189.             MetaEditor.addDaughter(ct, r.getRegionInfo(),  
  190.             this.serverNameFromMasterPOV);  
  191.         } else {  
  192.             MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),  
  193.             this.serverNameFromMasterPOV);  
  194.         }  
  195.     }         
  196. }  
  197.   
  198. //将ZK中 /hbase/unassigned 节点下的  
  199. //fad11edf1e6e0a842b7fd3ad87f25053(待处理的region)  
  200. //删除  
  201. SplitTransaction#transitionZKNode() {  
  202.     transitionNodeSplit();  
  203.     tickleNodeSplit();    
  204. }  

 

 

 

 

 

一些辅助逻辑:

  1. //等待压缩完成&#xff0c;然后刷新数据  
  2. //最后再线程池中关闭所有的Store  
  3. HRegion#close() {  
  4.     waitForFlushesAndCompactions();  
  5.     internalFlushcache();  
  6.     ThreadPoolExecutor storeCloserThreadPool &#61;  
  7.           getStoreOpenAndCloseThreadPool("StoreCloserThread-"  
  8.             &#43; this.regionInfo.getRegionNameAsString());  
  9.     CompletionService> completionService &#61;  
  10.           new ExecutorCompletionService>(  
  11.             storeCloserThreadPool);   
  12.               
  13.     for (final Store store : stores.values()) {  
  14.         completionService.submit(new Callable>() {  
  15.             public ImmutableList call() throws IOException {  
  16.                 return store.close();  
  17.             }  
  18.         });  
  19.     }              
  20. }  
  21.   
  22. //提交到线程池中关闭所有打开的StoreFile  
  23. Store#close() {  
  24.     for (final StoreFile f : result) {  
  25.         completionService.submit(new Callable() {  
  26.             public Void call() throws IOException {  
  27.               f.closeReader(true);  
  28.               return null;  
  29.         }  
  30.     }  
  31. }  

 

 

 

 

 

compactionChecker线程

这个类是用于定期检查region server下的region是否需要做compact

主要逻辑如下:

  1. //不停的遍历当前RegionServer下的所有Region  
  2. //然后检查是否需要做compact  
  3. CompactionChecker#chore() {  
  4.     for (HRegion r : this.instance.onlineRegions.values()) {  
  5.         for (Store s : r.getStores().values()) {  
  6.             if (s.needsCompaction()) {  
  7.                 // Queue a compaction. Will recognize if major is needed.  
  8.                 this.instance.compactSplitThread.requestCompaction(r, s, getName());  
  9.             } else if (s.isMajorCompaction()) {  
  10.                 if (majorCompactPriority &#61;&#61; DEFAULT_PRIORITY  
  11.                 || majorCompactPriority > r.getCompactPriority()) {  
  12.                     this.instance.compactSplitThread.requestCompaction(r, s, getName());  
  13.                 } else {  
  14.                     this.instance.compactSplitThread.requestCompaction(r, s, getName());      
  15.                 }  
  16.             }  
  17.         }     
  18.     }     
  19. }  

 

 

 

 

 

参考

深入分析HBase Compaction机制

Hbase的Region Compact算法实现分析

深入分析HBase RPC(Protobuf)实现机制

HBase region split源码分析




推荐阅读
  • 本文详细介绍了在PHP中如何获取和处理HTTP头部信息,包括通过cURL获取请求头信息、使用header函数发送响应头以及获取客户端HTTP头部的方法。同时,还探讨了PHP中$_SERVER变量的使用,以获取客户端和服务器的相关信息。 ... [详细]
  • 利用Python在DragonBoard 410c上解析GPS数据获取位置信息
    本文介绍了如何在DragonBoard 410c开发板上使用Python脚本来解析GPS报文,从而获取精确的位置信息。DragonBoard 410c集成了GPS、Wi-Fi和高性能GPU,非常适合用于各种物联网项目。 ... [详细]
  • 深入解析 RuntimeClass 及多容器运行时应用
    本文旨在探讨RuntimeClass的起源、功能及其在多容器运行时环境中的实际应用。通过详细的案例分析,帮助读者理解如何在Kubernetes集群中高效管理不同类型的容器运行时。 ... [详细]
  • 本文详细介绍了PHP中的几种超全局变量,包括$GLOBAL、$_SERVER、$_POST、$_GET等,并探讨了AJAX的工作原理及其优缺点。通过具体示例,帮助读者更好地理解和应用这些技术。 ... [详细]
  • 本文详细介绍了如何在PHP中使用Memcached进行数据缓存,包括服务器连接、数据操作、高级功能等。 ... [详细]
  • 在CentOS 7中部署Nginx并配置SSL证书
    本文详细介绍了如何在CentOS 7操作系统上安装Nginx服务器,并配置SSL证书以增强网站的安全性。适合初学者和中级用户参考。 ... [详细]
  • 本文介绍了如何通过安装和配置php_uploadprogress扩展来实现文件上传时的进度条显示功能。通过一个简单的示例,详细解释了从安装扩展到编写具体代码的全过程。 ... [详细]
  • Python网络编程:深入探讨TCP粘包问题及解决方案
    本文详细探讨了TCP协议下的粘包现象及其产生的原因,并提供了通过自定义报头解决粘包问题的具体实现方案。同时,对比了TCP与UDP协议在数据传输上的不同特性。 ... [详细]
  • 本文详细介绍如何在SSM(Spring + Spring MVC + MyBatis)框架中实现分页功能。包括分页的基本概念、数据准备、前端分页栏的设计与实现、后端分页逻辑的编写以及最终的测试步骤。 ... [详细]
  • 为什么会崩溃? ... [详细]
  • 本文详细介绍了Golang中string类型的内部结构及其特性,包括字符串的定义、表示方式、数据结构以及相关的操作方法,如字符串拼接和类型转换等。 ... [详细]
  • 一键LNMP配置SSL证书实现全站HTTPS访问
    许多网站搭建者选择了便捷的一键LNMP安装包,但在网站部署完成后,配置SSL证书以支持HTTPS访问是一个不可或缺的步骤。本文将详细介绍如何通过简单的步骤完成这一过程。 ... [详细]
  • 华为云openEuler环境下的Web应用部署实践
    本文详细记录了在华为云openEuler系统上进行Web应用部署的具体步骤,包括配置yum源、安装Apache、MariaDB、PHP及其相关组件,并完成WordPress的安装与配置过程。 ... [详细]
  • Kubernetes 实践指南:初次体验
    本文介绍了如何通过官方提供的简易示例,快速上手 Kubernetes (K8S),并深入理解其核心概念和操作流程。 ... [详细]
  • 本文探讨了使用Python实现监控信息收集的方法,涵盖从基础的日志记录到复杂的系统运维解决方案,旨在帮助开发者和运维人员提升工作效率。 ... [详细]
author-avatar
我们的生活小窍门
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有