热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

HDFS双缓冲技术核心源码剖析

本文大纲一、HDFS是大数据的基石我们都知道,HDFS是大数据存储的基石,所有的离线数据都存储在HDFS上,而NameNode是存储所有

本文大纲

image-20210327004152777


一、HDFS 是大数据的基石

我们都知道,HDFS 是大数据存储的基石,所有的离线数据都存储在 HDFS 上,而 NameNode 是存储所有元数据的地方(所谓元数据就是描述数据的数据,比如文件的大小,文件都存储在哪些 DataNode 上,文件在目录树的位置等),所以 NameNode 便成为了 HDFS 最关键的部分。

在离线数仓中,会存在很多离线任务,这些离线任务都要往 HDFS 写数据,每次写数据都会经过 NameNode 来保存元数据信息,那么 NameNode 势必会承担非常多的请求。NameNode 作为 HDFS 的核心,肯定自身要保证高可用,数据不能一直在内存中,要写到磁盘里。

所以一个关键的问题来了,NameNode 是用了什么神秘的技术,在保证自身高可用的同时,还能承担巨额的读写请求?


二、NameNode 高可用是如何实现的

下面直接来一个 NameNode 高可用的架构图:

image-20210322221217639

然后解释下如何保证高可用的:


(1)NameNode 只有一个时的单点故障问题

如果我们只部署了一个 NameNode,那么这个 NameNode 是有单点故障的问题的。如何解决,再加一个 NameNode 即可;


(2)当有两个 NameNode ,切换时,数据如何保持同步

两个 NameNode 一起工作,某一个 NameNode 挂掉了,另一个 NameNode 接替工作,这件事成立的必要前提是,两个 NameNode 的数据得时时刻刻保持一致。

那么如何保持数据一致,是不是可以在两个 NameNode 之间搞个共享的文件系统?仔细想想也不行,这样的话,单点故障问题就转移到这个文件系统上了。


(3)使用多节点的 JournalNode 作为主备 NameNode 的数据同步介质

这里引入了 JournalNode 集群,JournalNode 的每个节点的数据都是一样的,并且时刻保持一致。并且只要超过半数的节点存活,整个 JournalNode 集群都可以正常提供服务。

所以,一般会使用奇数个节点来搭建。 (为什么一般不用偶数个呢?因为 3 个节点构成的集群,可以容忍挂掉一台机器;而 4 个节点构成的集群,也只能容忍挂掉一台机器。同样是只能挂掉一台,为何不选 3 个节点的呢,还能节省资源)。

使用 JournalNode 集群,一个 NameNode 实时的往集群写,另一个 NameNode 也实时的读集群数据,这样两个 NameNode 数据就可以保持一致了。


(4)一个 NameNode 挂掉,另一个 NameNode 如何立马感知并接替工作

首先不能人工参与切换。那如何实时监听呢?

首先要再引入一个关键组件:Zookeeper。当两个 NameNode 同时启动后,他们都会去 Zookeeper 上注册,谁注册成功了,谁就能获得锁,成为 Active 状态的 NameNode。

另外还需要一个组件:ZKFC,它会实时监控 Zookeeper 进程状态,并且会以心跳的方式实时的告诉 Zookeeper NameNode 的状态。 如果一个 NameNode 进程挂了,就会把 Zookeeper 上的锁给另外一个 NameNode,让它成为 Active 的来工作。


三、NameNode 如何既高可用,还能高并发


1、双缓冲技术

NameNode 为了实现高可用,首先自己内存里的数据需要写到磁盘,然后还需要往 JournalNode 里写数据。

所以既然要写磁盘,还是往两个地方写磁盘,那必然性能会跟不上的。

所以这里 NameNode 引入了一个技术,也是本篇文章的重点:双缓冲技术

双缓冲的设计理念如下图:

image-20210325201042643

客户端不是直接写磁盘,而是往一个内存结构(Buffer1)里面写数据。当 Buffer1 达到一定阈值后,Buffer 1 和 Buffer 2 交换内存数据。此时 Buffer1 数据为空,Buffer2 开始在后台默默写磁盘。

这样的好处很明显的,前端只需要进行内存写 Buffer1 就行,性能特别高;而 Buffer2 在后台默默的同步日志到磁盘即可。

这样磁盘写,就转化成为了内存写,速度大大提高了。


2、如何实现双缓冲

然而,在真实环境不只一个客户端是这样子的:

image-20210325201357704

大数据情况下是 N 个客户端同时并发写的,在高并发的情况下,我们必然要去协调多个线程动作的一致性,比如往 Buffer1 的写动作,Buffer1 与 Buffer2 数据交换的动作,Buffer2 写磁盘的动作。

那么我们该如何实现这样一个巧妙的双缓冲呢?下面的代码是我从 Hadoop 源码里抽离出来的关键实现:

package org.apache.hadoop;import java.util.LinkedList;public class FSEditLog2 {private long txid&#61;0L;private DoubleBuffer editLogBuffer&#61;new DoubleBuffer();//是否正在刷写磁盘private volatile Boolean isSyncRunning &#61; false;private volatile Boolean isWaitSync &#61; false;private volatile Long syncMaxTxid &#61; 0L;//每个线程都对应自己的一个副本private ThreadLocal<Long> localTxid&#61;new ThreadLocal<Long>();public void logEdit(String content){//mkdir /asynchronized (this){//加锁的目的就是为了事务ID的唯一&#xff0c;而且是递增txid&#43;&#43;;localTxid.set(txid);EditLog log &#61; new EditLog(txid, content);editLogBuffer.write(log);}logSync();}private void logSync(){synchronized (this){if(isSyncRunning){ //是否有人正在把数据同步到磁盘上面long txid &#61; localTxid.get();if(txid <&#61; syncMaxTxid){//直接return&#xff0c;不接着干了&#xff1f;return;}if(isWaitSync){return;}isWaitSync &#61; true;while(isSyncRunning){try {wait(2000);}catch (Exception e){e.printStackTrace();}}isWaitSync &#61; false;}editLogBuffer.setReadyToSync();if(editLogBuffer.syncBuffer.size() > 0) {syncMaxTxid &#61; editLogBuffer.getSyncMaxTxid();}isSyncRunning &#61; true;} //释放锁editLogBuffer.flush();synchronized (this) {isSyncRunning &#61; false;notify();} //释放锁}/*** 把日志抽象成类*/class EditLog{//顺序递增long txid;//操作内容 mkdir /aString content;//构造函数public EditLog(long txid,String content){this.txid &#61; txid;this.content &#61; content;}//为了测试方便&#64;Overridepublic String toString() {return "EditLog{" &#43;"txid&#61;" &#43; txid &#43;", content&#61;&#39;" &#43; content &#43; &#39;\&#39;&#39; &#43;&#39;}&#39;;}}/*** 双缓存方案*/class DoubleBuffer{//内存1LinkedList<EditLog> currentBuffer &#61; new LinkedList<EditLog>();//内存2LinkedList<EditLog> syncBuffer&#61; new LinkedList<EditLog>();/*** 把数据写到当前内存1* &#64;param log*/public void write(EditLog log){currentBuffer.add(log);}/*** 交换内存*/public void setReadyToSync(){LinkedList<EditLog> tmp&#61; currentBuffer;currentBuffer &#61; syncBuffer;syncBuffer &#61; tmp;}/*** 获取内存2里面的日志的最大的事务编号* &#64;return*/public Long getSyncMaxTxid(){return syncBuffer.getLast().txid;}/*** 刷写磁盘*/public void flush(){for(EditLog log:syncBuffer){//把数据写到磁盘上System.out.println("存入磁盘日志信息&#xff1a;"&#43;log);}//把内存2里面的数据要清空syncBuffer.clear();}}
}

主要的业务逻辑就是 40 行&#xff0c;但是真的很巧妙。


1、EditLog

我们先看这个 EditLog 内部类&#xff0c;这是对 EditLog 日志的一个封装&#xff0c;就两个属性 txid 和 content&#xff0c;分别是日志的事务id&#xff08;保证唯一性&#xff09;和 内容。


2、DoubleBuffer

再看这个 DoubleBuffer 双缓冲类&#xff0c;很简单&#xff0c;就是在内存里面维护了两个有序的 LinkedList&#xff0c;分别是当前写编辑日志的缓冲和同步到磁盘的缓冲&#xff0c;其中的元素就是 EditLog 类。

write 方法就是把一条编辑日志写到当前缓冲里。

setReadyToSync 方法&#xff0c;就是交换两个缓冲&#xff0c;也是最简单的刚学习 Java 就学习过的两个变量交换值的方法。

getSyncMaxTxid 方法&#xff0c;获得正在同步的那个缓冲去里的最大的事务id。

flush 方法&#xff0c;遍历同步的缓冲的每一条编辑日志&#xff0c;写到磁盘&#xff0c;并最终清空缓冲区内容。


3、主类的一些属性说明

&#xff08;1&#xff09;全局的事务id

private long txid&#61;0L;

&#xff08;2&#xff09;双缓冲结构

private DoubleBuffer editLogBuffer&#61;new DoubleBuffer();

&#xff08;3&#xff09;控制变量

private volatile Boolean isSyncRunning &#61; false; // 是否正在同步数据到磁盘

private volatile Boolean isWaitSync &#61; false; // 是否有线程在等待同步数据到磁盘完成

private volatile Long syncMaxTxid &#61; 0L; // 当前同步的最大日志事务id

private ThreadLocal localTxid&#61;new ThreadLocal(); // 每个线程的线程副本&#xff0c;用来放本线程当前写入的日志事务id

&#xff08;4&#xff09;主逻辑 logEdit 方法

这个方法是对外暴露的方法&#xff0c;客户端往双缓冲写数据就是用的这个方法。

假设当前有一个线程1 进到了 logEdit 方法&#xff0c;首先直接把当前类实例加锁&#xff0c;避免别的线程进来&#xff0c;以此来保证编辑日志事务id的唯一自增性。

// 全局事务递增
txid&#43;&#43;;
// 往线程本身的变量里设置事务id值
localTxid.set(txid);
// 构造 EditLog 变量
EditLog log &#61; new EditLog(txid, content);
// 写入当前的 Buffer
editLogBuffer.write(log);

当它执行完了这些之后&#xff0c;释放锁&#xff0c;开始执行 logSync() 方法。此时由于释放了锁&#xff0c;于是很多线程开始拿到锁&#xff0c;进入了这个方法中。

假设有五个线程进来了分别写了一条日志&#xff0c;于是现在双缓冲是这样子的&#xff1a;

image-20210326235651691

好&#xff0c;然后线程1 开始进入 logSync 方法&#xff0c;第一步就是使用当前类的实例加了锁&#xff0c;保证只有一个线程进来。

检查 isSyncRunning 变量是否为 true&#xff0c;目前是 false&#xff0c;跳过这个方法。

开始执行这个 editLogBuffer.setReadyToSync(); 方法&#xff0c;于是双缓冲的数据直接被交换了。

image-20210326235912399

然后获得了全局最大的id&#xff0c;当前是 5&#xff0c;赋值给了 syncMaxTxid 变量

if(editLogBuffer.syncBuffer.size() > 0) {syncMaxTxid &#61; editLogBuffer.getSyncMaxTxid();
}

然后 isSyncRunning &#61; true; 把这个变量置为 true&#xff0c;表示正在同步数据到磁盘。此时释放锁。

然后 线程 1 开始执行数据同步到磁盘的动作&#xff1a; editLogBuffer.flush() ,这个动作肯定耗费的时间比较久&#xff0c;基本是在 ms 级别。

此时我们假设 线程2 争抢到了锁&#xff0c;进入到了 logSync 方法。

// 线程2 判断 是否有人正在把数据同步到磁盘上面&#xff0c;这个值被线程 1 改为 true 了
// 进入到 if 方法
if(isSyncRunning){ // 获得到自己线程的事务id&#xff0c;为 2long txid &#61; localTxid.get();// 2 是否小于 5 &#xff1f;小于&#xff0c;直接返回&#xff0c;因为此时 5 已经正在被同步到磁盘了if(txid <&#61; syncMaxTxid){return;}if(isWaitSync){return;}isWaitSync &#61; true;while(isSyncRunning){try {wait(2000);}catch (Exception e){e.printStackTrace();}}isWaitSync &#61; false;
}

线程2 由于自身的编辑日志的事务id 小于当前正在同步的最大的事务id&#xff0c;所以直接返回了&#xff0c;然后线程3 &#xff0c;线程4&#xff0c;线程5 进来都是这样&#xff0c;直接 return 返回。

假设线程6 此时进来&#xff0c;当前双缓冲状态是这样的

image-20210327000626162

下面线程 6 干的活&#xff0c;参考下面代码里的注释&#xff1a;

// 线程6 判断是否有人正在把数据同步到磁盘上面&#xff0c;这个值被线程 1 改为 true 了
// 进入到 if 方法
if(isSyncRunning){ // 获得到自己线程的事务id&#xff0c;为 6long txid &#61; localTxid.get();// 6 是否小于 5 &#xff0c;不小于继续执行if(txid <&#61; syncMaxTxid){return;}// 这个值为 false&#xff0c;继续执行if(isWaitSync){return;}// 把 isWaitSync 设置为 trueisWaitSync &#61; true;// 这个值被线程1置为了 true&#xff0c;所以这里在死循环while(isSyncRunning){try {// 等待 2s&#xff0c;wait 会释放锁&#xff0c;同时线程 6 进入睡眠中wait(2000);}catch (Exception e){e.printStackTrace();}}isWaitSync &#61; false;
}

可以看到 线程 6 在 while 循环里无限等待数据同步到磁盘完毕。然后由于线程 6 把 isWaitSync 值改为了 true&#xff0c;线程 6 在等待期间释放锁&#xff0c;被其他线程抢到之后&#xff0c;其他线程由于 isWaitSync 为true&#xff0c;直接返回了。

当过了一会儿&#xff0c;线程1 把第二个 Buffer 同步到磁盘完毕后&#xff0c;线程1 会执行这些代码

synchronized (this) {isSyncRunning &#61; false;notify();
} //释放锁

把 isSyncRunning 变量置为 false&#xff0c;同时调用 notify()&#xff0c;通知线程 6 &#xff0c;你可以继续参与锁的竞争了。

然后线程6 &#xff0c;从 wait 中醒来&#xff0c;重新参与锁竞争&#xff0c;继续执行接下来的代码。此时 isSyncRunning 已经为 false&#xff0c;所以它跳出了 while 循环&#xff0c;把 isWaitSync 置为了 false。

然后它开始执行&#xff1a;交换缓冲区&#xff0c;给最大的事务id&#xff08;此时为6 &#xff09;赋值&#xff0c;把 isSyncRunning 赋值为 true。

editLogBuffer.setReadyToSync();
if(editLogBuffer.syncBuffer.size() > 0) {syncMaxTxid &#61; editLogBuffer.getSyncMaxTxid();
}isSyncRunning &#61; true;

执行完了之后&#xff0c;释放锁&#xff0c;开始执行Buffer2 的同步。然后所有的线程就按照上面的方式有序的工作。

这段几十行的代码很精炼&#xff0c;值得反复推敲&#xff0c;总结下来如下&#xff1a;

&#xff08;1&#xff09;写缓冲到内存 和 同步数据到磁盘分开&#xff0c;互不影响和干扰&#xff1b;

&#xff08;2&#xff09;使用 synchronize &#xff0c;wait 和 notify 来保证多线程有序进行工作&#xff1b;

&#xff08;3&#xff09;当在同步数据到磁盘中的时候&#xff0c;其他争抢到锁进来准备同步数据的线程只能等待&#xff1b;

&#xff08;4&#xff09;线程使用 ThreadLocal 变量&#xff0c;来记录自身当前的事务id&#xff0c;如果小于当前正在同步的最大事务id&#xff0c;则不同步&#xff1b;

&#xff08;5&#xff09;有线程在等待同步数据的时候&#xff0c;其他线程写完 editLog 到内存后直接返回&#xff1b;


四、最后的总结

本文详细探讨了 HDFS 在大数据中基石的地位&#xff0c;以及如何保障 NameNode 高可用的运行。

NameNode 在高可用运行时&#xff0c;同时是如何保证高并发读写操作的。双缓冲在其中起到了核心的作用&#xff0c;把写数据和同步数据到磁盘分离开&#xff0c;互不影响。

同时我们还剥离了一段核心双缓冲的实现代码&#xff0c;仔细分析了实现原理。这短短的几十行代码&#xff0c;可谓综合利用了多线程高并发的知识&#xff0c;耐人寻味。


推荐阅读
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • 本文介绍了 Go 语言中的高性能、可扩展、轻量级 Web 框架 Echo。Echo 框架简单易用,仅需几行代码即可启动一个高性能 HTTP 服务。 ... [详细]
  • Spring Data JdbcTemplate 入门指南
    本文将介绍如何使用 Spring JdbcTemplate 进行数据库操作,包括查询和插入数据。我们将通过一个学生表的示例来演示具体步骤。 ... [详细]
  • 为什么多数程序员难以成为架构师?
    探讨80%的程序员为何难以晋升为架构师,涉及技术深度、经验积累和综合能力等方面。本文将详细解析Tomcat的配置和服务组件,帮助读者理解其内部机制。 ... [详细]
  • 本文详细介绍了如何在Linux系统(以CentOS为例)上彻底卸载Zimbra邮件系统,包括停止服务、删除文件和用户等步骤。 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • 原文网址:https:www.cnblogs.comysoceanp7476379.html目录1、AOP什么?2、需求3、解决办法1:使用静态代理4 ... [详细]
  • packagecom.panchan.tsmese.utils;importjava.lang.reflect.ParameterizedType;importjava.lang. ... [详细]
  • 本文介绍了Java编程语言的基础知识,包括其历史背景、主要特性以及如何安装和配置JDK。此外,还详细讲解了如何编写和运行第一个Java程序,并简要介绍了Eclipse集成开发环境的安装和使用。 ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • 本文节选自《NLTK基础教程——用NLTK和Python库构建机器学习应用》一书的第1章第1.2节,作者Nitin Hardeniya。本文将带领读者快速了解Python的基础知识,为后续的机器学习应用打下坚实的基础。 ... [详细]
author-avatar
飞航
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有