我们都知道,HDFS 是大数据存储的基石,所有的离线数据都存储在 HDFS 上,而 NameNode 是存储所有元数据的地方(所谓元数据就是描述数据的数据,比如文件的大小,文件都存储在哪些 DataNode 上,文件在目录树的位置等),所以 NameNode 便成为了 HDFS 最关键的部分。
在离线数仓中,会存在很多离线任务,这些离线任务都要往 HDFS 写数据,每次写数据都会经过 NameNode 来保存元数据信息,那么 NameNode 势必会承担非常多的请求。NameNode 作为 HDFS 的核心,肯定自身要保证高可用,数据不能一直在内存中,要写到磁盘里。
所以一个关键的问题来了,NameNode 是用了什么神秘的技术,在保证自身高可用的同时,还能承担巨额的读写请求?
下面直接来一个 NameNode 高可用的架构图:
然后解释下如何保证高可用的:
如果我们只部署了一个 NameNode,那么这个 NameNode 是有单点故障的问题的。如何解决,再加一个 NameNode 即可;
两个 NameNode 一起工作,某一个 NameNode 挂掉了,另一个 NameNode 接替工作,这件事成立的必要前提是,两个 NameNode 的数据得时时刻刻保持一致。
那么如何保持数据一致,是不是可以在两个 NameNode 之间搞个共享的文件系统?仔细想想也不行,这样的话,单点故障问题就转移到这个文件系统上了。
这里引入了 JournalNode 集群,JournalNode 的每个节点的数据都是一样的,并且时刻保持一致。并且只要超过半数的节点存活,整个 JournalNode 集群都可以正常提供服务。
所以,一般会使用奇数个节点来搭建。 (为什么一般不用偶数个呢?因为 3 个节点构成的集群,可以容忍挂掉一台机器;而 4 个节点构成的集群,也只能容忍挂掉一台机器。同样是只能挂掉一台,为何不选 3 个节点的呢,还能节省资源)。
使用 JournalNode 集群,一个 NameNode 实时的往集群写,另一个 NameNode 也实时的读集群数据,这样两个 NameNode 数据就可以保持一致了。
首先不能人工参与切换。那如何实时监听呢?
首先要再引入一个关键组件:Zookeeper。当两个 NameNode 同时启动后,他们都会去 Zookeeper 上注册,谁注册成功了,谁就能获得锁,成为 Active 状态的 NameNode。
另外还需要一个组件:ZKFC,它会实时监控 Zookeeper 进程状态,并且会以心跳的方式实时的告诉 Zookeeper NameNode 的状态。 如果一个 NameNode 进程挂了,就会把 Zookeeper 上的锁给另外一个 NameNode,让它成为 Active 的来工作。
NameNode 为了实现高可用,首先自己内存里的数据需要写到磁盘,然后还需要往 JournalNode 里写数据。
所以既然要写磁盘,还是往两个地方写磁盘,那必然性能会跟不上的。
所以这里 NameNode 引入了一个技术,也是本篇文章的重点:双缓冲技术。
双缓冲的设计理念如下图:
客户端不是直接写磁盘,而是往一个内存结构(Buffer1)里面写数据。当 Buffer1 达到一定阈值后,Buffer 1 和 Buffer 2 交换内存数据。此时 Buffer1 数据为空,Buffer2 开始在后台默默写磁盘。
这样的好处很明显的,前端只需要进行内存写 Buffer1 就行,性能特别高;而 Buffer2 在后台默默的同步日志到磁盘即可。
这样磁盘写,就转化成为了内存写,速度大大提高了。
然而,在真实环境不只一个客户端是这样子的:
大数据情况下是 N 个客户端同时并发写的,在高并发的情况下,我们必然要去协调多个线程动作的一致性,比如往 Buffer1 的写动作,Buffer1 与 Buffer2 数据交换的动作,Buffer2 写磁盘的动作。
那么我们该如何实现这样一个巧妙的双缓冲呢?下面的代码是我从 Hadoop 源码里抽离出来的关键实现:
package org.apache.hadoop;import java.util.LinkedList;public class FSEditLog2 {private long txid&#61;0L;private DoubleBuffer editLogBuffer&#61;new DoubleBuffer();//是否正在刷写磁盘private volatile Boolean isSyncRunning &#61; false;private volatile Boolean isWaitSync &#61; false;private volatile Long syncMaxTxid &#61; 0L;//每个线程都对应自己的一个副本private ThreadLocal<Long> localTxid&#61;new ThreadLocal<Long>();public void logEdit(String content){//mkdir /asynchronized (this){//加锁的目的就是为了事务ID的唯一&#xff0c;而且是递增txid&#43;&#43;;localTxid.set(txid);EditLog log &#61; new EditLog(txid, content);editLogBuffer.write(log);}logSync();}private void logSync(){synchronized (this){if(isSyncRunning){ //是否有人正在把数据同步到磁盘上面long txid &#61; localTxid.get();if(txid <&#61; syncMaxTxid){//直接return&#xff0c;不接着干了&#xff1f;return;}if(isWaitSync){return;}isWaitSync &#61; true;while(isSyncRunning){try {wait(2000);}catch (Exception e){e.printStackTrace();}}isWaitSync &#61; false;}editLogBuffer.setReadyToSync();if(editLogBuffer.syncBuffer.size() > 0) {syncMaxTxid &#61; editLogBuffer.getSyncMaxTxid();}isSyncRunning &#61; true;} //释放锁editLogBuffer.flush();synchronized (this) {isSyncRunning &#61; false;notify();} //释放锁}/*** 把日志抽象成类*/class EditLog{//顺序递增long txid;//操作内容 mkdir /aString content;//构造函数public EditLog(long txid,String content){this.txid &#61; txid;this.content &#61; content;}//为了测试方便&#64;Overridepublic String toString() {return "EditLog{" &#43;"txid&#61;" &#43; txid &#43;", content&#61;&#39;" &#43; content &#43; &#39;\&#39;&#39; &#43;&#39;}&#39;;}}/*** 双缓存方案*/class DoubleBuffer{//内存1LinkedList<EditLog> currentBuffer &#61; new LinkedList<EditLog>();//内存2LinkedList<EditLog> syncBuffer&#61; new LinkedList<EditLog>();/*** 把数据写到当前内存1* &#64;param log*/public void write(EditLog log){currentBuffer.add(log);}/*** 交换内存*/public void setReadyToSync(){LinkedList<EditLog> tmp&#61; currentBuffer;currentBuffer &#61; syncBuffer;syncBuffer &#61; tmp;}/*** 获取内存2里面的日志的最大的事务编号* &#64;return*/public Long getSyncMaxTxid(){return syncBuffer.getLast().txid;}/*** 刷写磁盘*/public void flush(){for(EditLog log:syncBuffer){//把数据写到磁盘上System.out.println("存入磁盘日志信息&#xff1a;"&#43;log);}//把内存2里面的数据要清空syncBuffer.clear();}}
}
主要的业务逻辑就是 40 行&#xff0c;但是真的很巧妙。
我们先看这个 EditLog 内部类&#xff0c;这是对 EditLog 日志的一个封装&#xff0c;就两个属性 txid 和 content&#xff0c;分别是日志的事务id&#xff08;保证唯一性&#xff09;和 内容。
再看这个 DoubleBuffer 双缓冲类&#xff0c;很简单&#xff0c;就是在内存里面维护了两个有序的 LinkedList&#xff0c;分别是当前写编辑日志的缓冲和同步到磁盘的缓冲&#xff0c;其中的元素就是 EditLog 类。
write 方法就是把一条编辑日志写到当前缓冲里。
setReadyToSync 方法&#xff0c;就是交换两个缓冲&#xff0c;也是最简单的刚学习 Java 就学习过的两个变量交换值的方法。
getSyncMaxTxid 方法&#xff0c;获得正在同步的那个缓冲去里的最大的事务id。
flush 方法&#xff0c;遍历同步的缓冲的每一条编辑日志&#xff0c;写到磁盘&#xff0c;并最终清空缓冲区内容。
&#xff08;1&#xff09;全局的事务id
private long txid&#61;0L;
&#xff08;2&#xff09;双缓冲结构
private DoubleBuffer editLogBuffer&#61;new DoubleBuffer();
&#xff08;3&#xff09;控制变量
private volatile Boolean isSyncRunning &#61; false; // 是否正在同步数据到磁盘
private volatile Boolean isWaitSync &#61; false; // 是否有线程在等待同步数据到磁盘完成
private volatile Long syncMaxTxid &#61; 0L; // 当前同步的最大日志事务id
private ThreadLocal localTxid&#61;new ThreadLocal(); // 每个线程的线程副本&#xff0c;用来放本线程当前写入的日志事务id
&#xff08;4&#xff09;主逻辑 logEdit 方法
这个方法是对外暴露的方法&#xff0c;客户端往双缓冲写数据就是用的这个方法。
假设当前有一个线程1 进到了 logEdit 方法&#xff0c;首先直接把当前类实例加锁&#xff0c;避免别的线程进来&#xff0c;以此来保证编辑日志事务id的唯一自增性。
// 全局事务递增
txid&#43;&#43;;
// 往线程本身的变量里设置事务id值
localTxid.set(txid);
// 构造 EditLog 变量
EditLog log &#61; new EditLog(txid, content);
// 写入当前的 Buffer
editLogBuffer.write(log);
当它执行完了这些之后&#xff0c;释放锁&#xff0c;开始执行 logSync() 方法。此时由于释放了锁&#xff0c;于是很多线程开始拿到锁&#xff0c;进入了这个方法中。
假设有五个线程进来了分别写了一条日志&#xff0c;于是现在双缓冲是这样子的&#xff1a;
好&#xff0c;然后线程1 开始进入 logSync 方法&#xff0c;第一步就是使用当前类的实例加了锁&#xff0c;保证只有一个线程进来。
检查 isSyncRunning 变量是否为 true&#xff0c;目前是 false&#xff0c;跳过这个方法。
开始执行这个 editLogBuffer.setReadyToSync(); 方法&#xff0c;于是双缓冲的数据直接被交换了。
然后获得了全局最大的id&#xff0c;当前是 5&#xff0c;赋值给了 syncMaxTxid 变量
if(editLogBuffer.syncBuffer.size() > 0) {syncMaxTxid &#61; editLogBuffer.getSyncMaxTxid();
}
然后 isSyncRunning &#61; true; 把这个变量置为 true&#xff0c;表示正在同步数据到磁盘。此时释放锁。
然后 线程 1 开始执行数据同步到磁盘的动作&#xff1a; editLogBuffer.flush() ,这个动作肯定耗费的时间比较久&#xff0c;基本是在 ms 级别。
此时我们假设 线程2 争抢到了锁&#xff0c;进入到了 logSync 方法。
// 线程2 判断 是否有人正在把数据同步到磁盘上面&#xff0c;这个值被线程 1 改为 true 了
// 进入到 if 方法
if(isSyncRunning){ // 获得到自己线程的事务id&#xff0c;为 2long txid &#61; localTxid.get();// 2 是否小于 5 &#xff1f;小于&#xff0c;直接返回&#xff0c;因为此时 5 已经正在被同步到磁盘了if(txid <&#61; syncMaxTxid){return;}if(isWaitSync){return;}isWaitSync &#61; true;while(isSyncRunning){try {wait(2000);}catch (Exception e){e.printStackTrace();}}isWaitSync &#61; false;
}
线程2 由于自身的编辑日志的事务id 小于当前正在同步的最大的事务id&#xff0c;所以直接返回了&#xff0c;然后线程3 &#xff0c;线程4&#xff0c;线程5 进来都是这样&#xff0c;直接 return 返回。
假设线程6 此时进来&#xff0c;当前双缓冲状态是这样的
下面线程 6 干的活&#xff0c;参考下面代码里的注释&#xff1a;
// 线程6 判断是否有人正在把数据同步到磁盘上面&#xff0c;这个值被线程 1 改为 true 了
// 进入到 if 方法
if(isSyncRunning){ // 获得到自己线程的事务id&#xff0c;为 6long txid &#61; localTxid.get();// 6 是否小于 5 &#xff0c;不小于继续执行if(txid <&#61; syncMaxTxid){return;}// 这个值为 false&#xff0c;继续执行if(isWaitSync){return;}// 把 isWaitSync 设置为 trueisWaitSync &#61; true;// 这个值被线程1置为了 true&#xff0c;所以这里在死循环while(isSyncRunning){try {// 等待 2s&#xff0c;wait 会释放锁&#xff0c;同时线程 6 进入睡眠中wait(2000);}catch (Exception e){e.printStackTrace();}}isWaitSync &#61; false;
}
可以看到 线程 6 在 while 循环里无限等待数据同步到磁盘完毕。然后由于线程 6 把 isWaitSync 值改为了 true&#xff0c;线程 6 在等待期间释放锁&#xff0c;被其他线程抢到之后&#xff0c;其他线程由于 isWaitSync 为true&#xff0c;直接返回了。
当过了一会儿&#xff0c;线程1 把第二个 Buffer 同步到磁盘完毕后&#xff0c;线程1 会执行这些代码
synchronized (this) {isSyncRunning &#61; false;notify();
} //释放锁
把 isSyncRunning 变量置为 false&#xff0c;同时调用 notify()&#xff0c;通知线程 6 &#xff0c;你可以继续参与锁的竞争了。
然后线程6 &#xff0c;从 wait 中醒来&#xff0c;重新参与锁竞争&#xff0c;继续执行接下来的代码。此时 isSyncRunning 已经为 false&#xff0c;所以它跳出了 while 循环&#xff0c;把 isWaitSync 置为了 false。
然后它开始执行&#xff1a;交换缓冲区&#xff0c;给最大的事务id&#xff08;此时为6 &#xff09;赋值&#xff0c;把 isSyncRunning 赋值为 true。
editLogBuffer.setReadyToSync();
if(editLogBuffer.syncBuffer.size() > 0) {syncMaxTxid &#61; editLogBuffer.getSyncMaxTxid();
}isSyncRunning &#61; true;
执行完了之后&#xff0c;释放锁&#xff0c;开始执行Buffer2 的同步。然后所有的线程就按照上面的方式有序的工作。
这段几十行的代码很精炼&#xff0c;值得反复推敲&#xff0c;总结下来如下&#xff1a;
&#xff08;1&#xff09;写缓冲到内存 和 同步数据到磁盘分开&#xff0c;互不影响和干扰&#xff1b;
&#xff08;2&#xff09;使用 synchronize &#xff0c;wait 和 notify 来保证多线程有序进行工作&#xff1b;
&#xff08;3&#xff09;当在同步数据到磁盘中的时候&#xff0c;其他争抢到锁进来准备同步数据的线程只能等待&#xff1b;
&#xff08;4&#xff09;线程使用 ThreadLocal 变量&#xff0c;来记录自身当前的事务id&#xff0c;如果小于当前正在同步的最大事务id&#xff0c;则不同步&#xff1b;
&#xff08;5&#xff09;有线程在等待同步数据的时候&#xff0c;其他线程写完 editLog 到内存后直接返回&#xff1b;
本文详细探讨了 HDFS 在大数据中基石的地位&#xff0c;以及如何保障 NameNode 高可用的运行。
NameNode 在高可用运行时&#xff0c;同时是如何保证高并发读写操作的。双缓冲在其中起到了核心的作用&#xff0c;把写数据和同步数据到磁盘分离开&#xff0c;互不影响。
同时我们还剥离了一段核心双缓冲的实现代码&#xff0c;仔细分析了实现原理。这短短的几十行代码&#xff0c;可谓综合利用了多线程高并发的知识&#xff0c;耐人寻味。