热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Lab2B.Raft日志复制实现

总结下MIT6.824Lab2BLogReplication的实验笔记。Lecture参考:2A部分完成了基础的LeaderElection和Heartbeat机制,2B部分要完成LogReplication,同时实现论文中5.4.1节选举限制机制来保证选举的安全性。本节实验目标是通过

总结下 MIT6.824 Lab2B Log Replication 的实验笔记。Lecture 参考: lab-raft.html

Lab2B

测试用例

2A 部分完成了基础的 Leader Election 和 Heartbeat 机制,2B 部分要完成 Log Replication,同时实现论文中 5.4.1 节选举限制机制来保证选举的安全性。 本节实验目标是通过 test_test.go 中的 *2B 测试用例:

  • TestBasicAgree2B:实现最简单的日志复制
    对 leader 请求执行 3 个命令,五个节点均正常的情况下日志要能达成一致。
  • TestFailAgree2B:处理少部分节点失效
    三个节点组成的集群中,某个普通节点发生了网络分区后,剩余两个节点要能继续 commit 和 apply 命令,当该该节点的网络恢复后,要能正确处理它的 higher term
  • TestFailNoAgree2B:处理大部分节点失效
    在五个节点组成的集群中,若有三个节点失效,则 leader 处理的新命令都是 uncommit 的状态,也就不会 apply,但当三个节点的网络恢复后,要能根据日志新旧正确处理选举。
  • TestConcurrentStarts2B:处理并发的命令请求
    在多个命令并发请求时,leader 要保证每次只能完整处理一条命令,不能因为并发导致有命令漏处理。
  • TestRejoin2B:处理过期 leader 提交的命令
    过期 leader 本地有 uncommit 的旧日志,在 AppendEntries RPC 做日志一致性检查时进行日志的强制同步。这是最棘手的测试,其流程如下:
    Lab2B. Raft 日志复制实现
  • TestBackup2B:性能测试
    在少部分节点失效、多部分节点失效环境下,尽快完成两百个命令的正确处理。
  • TestCount2B:检查无效通信的次数
    正常情况下,超时无效的 RPC 调用不能过多。

测试均通过:

Lab2B. Raft 日志复制实现

实现思路

Lab2B. Raft 日志复制实现

整体流程

  • client 将命令发送给 leader 后,leader 先本地 append 日志后立刻响应(lab 与 paper 此处有差异),随后广播给所有其他节点的 sync trigger,主动触发日志复制。
  • follower 收到日志后进行一致性检查,强制覆写冲突日志并 append 新日志,通知 leader 复制成功。
  • leader 在后台统计当前任期的日志复制成功的节点数量,若达到 majority 则将日志标记为 commit 状态并通知 apply
  • 在之后的心跳请求中,leader 将自己的 commitIndex 一并同步,follower 发现自己的 commitIndex 落后,随即更新,通知 apply

关键点

  • 正如 lecture 的提示,实现时需要大量的同步触发机制,可选择 Go 的阻塞 channel 或 sync 包的条件变量。其中,阻塞 channel 使用不当可能会造成死锁或资源泄漏,而且触发点很多,会造成一个 channel 满天飞的情况,遂选择条件变量做同步。
  • leader 要实现上图三个 daemon 机制,而 follower 只需要实现 apply checker
    • leader 的 sync trigger:新日志 append 或心跳通信都会触发
    • leader 的 commit checker:一直通过 matchIndex 检测日志的 commit 状态
    • leader、follower 的 apply checker:当确定某条日志未 commit 状态时触发 apply 执行
  • 充分理解论文的图 2:lastApplied 和 commitIndex,nextIndex[] 和 matchIndex[] 都将用于复制机制。

日志复制

日志结构

Raft 的目标是在大多数节点都可用且能相互通信的前提下,保证多个节点上日志的一致性。日志的存储结构:

type LogEntry struct {
	Term    int          
	Command interface{}
}

Term 是 Raft 协议的逻辑时钟,用于检查日志的一致性。它有三种状态:

  • commit / committed:当日志被成功 replicated 到大多数节点后的状态
  • apply / applied:日志已处于 commit 状态后,即可直接 apply 执行
  • uncommit:日志因网络分区等原因未成功复制到大多数节点,停留在 leader 内的状态

AppendEntries RPC

leader 通过 AppendEntries RPC 与各节点进行日志的同步。请求参数和响应参数如下:

type AppendEntriesArgs struct {
	Term         int        // leader term
	LeaderID     int        // so follower can redirect clients
	PrevLogIndex int        // index of log entry immediately preceding new ones
	PrevLogTerm  int        // term of prevLogIndex entry
	Entries      []LogEntry // log entries to store (empty for heartbeat;may send more than one for efficiency)
	LeaderCommit int        // leader’s commitIndex
}

type AppendEntriesReply struct {
	Term          int  // currentTerm, for leader to update itself
	Succ          bool // true if follower contained entry matching prevLogIndex and prevLogTerm
}

被调用方(Servers)

参考论文图 2,当节点收到此调用后,依次进行五个判断:

  • Reply false if term
  • Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm
  • If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it
  • Append any new entries not already in the log
  • If leaderCommit > commitIndex, set commitIndex =min(leaderCommit, index of last new entry)
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	reply.Term = rf.curTerm
	reply.Succ = false

	if args.Term  rf.curTerm {
		rf.curTerm = args.Term
		rf.back2Follower(args.Term, VOTE_NIL)
	}
	// now terms are same
	rf.resetElectTimer()

	// consistency check
	last := len(rf.logs) - 1
	if last  last {
			rf.logs = append(rf.logs, e) // new log, just append
			committed = len(rf.logs) - 1
		}
	}

	// if leaderCommit > commitIndex, set commitIndex =min(leaderCommit, index of last new entry)
	if args.LeaderCommit > rf.commitIndex {
		rf.commitIndex = min(committed, args.LeaderCommit) // need to commit
		rf.applyCond.Broadcast() // trigger apply
	}

	rf.back2Follower(args.Term, args.LeaderID)
	reply.Succ = true
}

调用方(Leader)

// leader replicate logs or send heartneat to other nodes
func (rf *Raft) sync() {
	for i := range rf.peers {
		if i == rf.me {
			rf.resetElectTimer()
			continue
		}

		go func(server int) {
			for {
				if !rf.isLeader() {
					return
				}

				rf.mu.Lock()
				rf.syncConds[server].Wait() // wait for heartbeat or Start to trigger

				// sync new log or missing logs to server
				next := rf.nextIndex[server]
				args := AppendEntriesArgs{
					Term:         rf.curTerm,
					LeaderID:     rf.me,
					Entries:      nil,
					LeaderCommit: rf.commitIndex,
				}
				if next  rf.curTerm { // higher term
						rf.back2Follower(reply.Term, VOTE_NIL)
						return
					}
					continue
				}

				// append succeed
				rf.nextIndex[server] += len(args.Entries)
				rf.matchIndex[server] = rf.nextIndex[server] - 1 // replicate succeed
			}
		}(i)
	}
}

Daemon goroutines

Apply Checker

每个节点在 Make 初始化时会启动两个后台 goroutine:

lastApplied  
 
// apply (lastApplied, commitIndex]
func (rf *Raft) waitApply() {
	for {
		rf.mu.Lock()
		rf.applyCond.Wait() // wait for new commit log trigger

		var logs []LogEntry // un apply logs
		applied := rf.lastApplied
		committed := rf.commitIndex
		if applied  
  

Commit Checker

在设计实现时,leader 将日志的 replicate 和 commit 解耦,所以需要 leader 在后台循环检测本轮中哪些日志已被提交:

// leader daemon detect and commit log which has been replicated on majority successfully
func (rf *Raft) leaderCommit() {
	for {
		if !rf.isLeader() {
			return
		}

		rf.mu.Lock()
		majority := len(rf.peers)/2 + 1
		n := len(rf.logs)
		for i := n - 1; i > rf.commitIndex; i-- { // looking for newest commit index from tail to head
			// in current term, if replicated on majority, commit it
			replicated := 0
			if rf.logs[i].Term == rf.curTerm {
				for server := range rf.peers {
					if rf.matchIndex[server] >= i {
						replicated += 1
					}
				}
			}

			if replicated >= majority {
				// all (commitIndex, newest commitIndex] logs are committed
				// leader now apply them
				rf.applyCond.Broadcast()
				rf.commitIndex = i
				break
			}
		}
		rf.mu.Unlock()
	}
}

选举限制

参考论文 5.4.1 节,为保证选举安全,在投票环节限制:若 candidate 没有前任 leaders 已提交所有日志,就不能赢得选举。限制是通过比较 candidate 和 follower 的日志新旧实现的,Raft 对日志新旧的定义是,让两个节点比较各自的最后一条日志:

  • 若任期号不同,任期号大的节点日志最新
  • 若任期号相同,日志更长的节点日志最新
// election restrictions
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	reply.Term = rf.curTerm
	reply.VoteGranted = false

	if args.Term  rf.curTerm {
		rf.back2Follower(args.Term, VOTE_NIL)
	}
	// now the term are same

	// check up-to-date, from Paper:
	// if the logs have last entries with different terms, then the log with the later term is more up-to-date.
	// if the logs end with the same term, then whichever log is longer is more up-to-date.
	i := len(rf.logs) - 1
	lastTerm := rf.logs[i].Term
	if lastTerm > args.LastLogTerm {
		return
	}
	if lastTerm == args.LastLogTerm && i > args.LastLogIndex {
		return
	}
	// now last index and term both matched

	if rf.votedFor == VOTE_NIL || rf.votedFor == args.CandidateID {
		reply.VoteGranted = true
		rf.back2Follower(args.Term, args.CandidateID)
	}

	return
}

至此,梳理了 Lab2B 日志复制的设计流程、实现了选举限制 up-to-date。

总结

Lab2B 应该是三个部分最难的了,我前后折腾了两三个星期,从尝试到处飞的 channel 同步换到了 sync.Cond 才更易调试和实现。值得一提的是,文件结构上的解耦也是十分有必要的,比如我的:

➜  raft git:(master) tree
.
├── config.go
├── persister.go
├── raft.go          # 节点初始化,超时选举机制
├── raft_entry.go    # AppendEntries RPC 逻辑
├── raft_leader.go   # sync 日志,心跳通信等
├── raft_peer.go     # 定义超时时间
├── raft_vote.go     # RequestVote RPC 逻辑
├── test_test.go
└── util.go          # 自定义的调试函数等

为尊重课程的 Collaboration Policy,我把 GitHub repo 设为了 Private,由于经验有限,上述代码可能还有 bug,如您发现还望留言告知,感谢您的阅读。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 我们


推荐阅读
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 本文详细介绍了在 Ubuntu 系统上搭建 Hadoop 集群时遇到的 SSH 密钥认证问题及其解决方案。通过本文,读者可以了解如何在多台虚拟机之间实现无密码 SSH 登录,从而顺利启动 Hadoop 集群。 ... [详细]
  • Tornado框架中模块与静态文件的应用
    本文详细介绍了Tornado框架中模块和静态文件的使用方法。首先明确模块与模板的区别,然后通过具体的代码示例展示如何在HTML文档中使用模块,并配置模块的路由。最后,提供了模块类中参数获取的示例。 ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • 解决 Windows Server 2016 网络连接问题
    本文详细介绍了如何解决 Windows Server 2016 在使用无线网络 (WLAN) 和有线网络 (以太网) 时遇到的连接问题。包括添加必要的功能和安装正确的驱动程序。 ... [详细]
  • 本文详细介绍了MySQL数据库的基础语法与核心操作,涵盖从基础概念到具体应用的多个方面。首先,文章从基础知识入手,逐步深入到创建和修改数据表的操作。接着,详细讲解了如何进行数据的插入、更新与删除。在查询部分,不仅介绍了DISTINCT和LIMIT的使用方法,还探讨了排序、过滤和通配符的应用。此外,文章还涵盖了计算字段以及多种函数的使用,包括文本处理、日期和时间处理及数值处理等。通过这些内容,读者可以全面掌握MySQL数据库的核心操作技巧。 ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • 本文探讨了如何通过编程手段在Linux系统中禁用硬件预取功能。基于Intel® Core™微架构的应用性能优化需求,文章详细介绍了相关配置方法和代码实现,旨在帮助开发人员有效控制硬件预取行为,提升应用程序的运行效率。 ... [详细]
  • Python全局解释器锁(GIL)机制详解
    在Python中,线程是操作系统级别的原生线程。为了确保多线程环境下的内存安全,Python虚拟机引入了全局解释器锁(Global Interpreter Lock,简称GIL)。GIL是一种互斥锁,用于保护对解释器状态的访问,防止多个线程同时执行字节码。尽管GIL有助于简化内存管理,但它也限制了多核处理器上多线程程序的并行性能。本文将深入探讨GIL的工作原理及其对Python多线程编程的影响。 ... [详细]
  • 用阿里云的免费 SSL 证书让网站从 HTTP 换成 HTTPS
    HTTP协议是不加密传输数据的,也就是用户跟你的网站之间传递数据有可能在途中被截获,破解传递的真实内容,所以使用不加密的HTTP的网站是不 ... [详细]
  • 本文详细解析了 Android 系统启动过程中的核心文件 `init.c`,探讨了其在系统初始化阶段的关键作用。通过对 `init.c` 的源代码进行深入分析,揭示了其如何管理进程、解析配置文件以及执行系统启动脚本。此外,文章还介绍了 `init` 进程的生命周期及其与内核的交互方式,为开发者提供了深入了解 Android 启动机制的宝贵资料。 ... [详细]
  • ### 优化后的摘要本学习指南旨在帮助读者全面掌握 Bootstrap 前端框架的核心知识点与实战技巧。内容涵盖基础入门、核心功能和高级应用。第一章通过一个简单的“Hello World”示例,介绍 Bootstrap 的基本用法和快速上手方法。第二章深入探讨 Bootstrap 与 JSP 集成的细节,揭示两者结合的优势和应用场景。第三章则进一步讲解 Bootstrap 的高级特性,如响应式设计和组件定制,为开发者提供全方位的技术支持。 ... [详细]
  • 在 Axublog 1.1.0 版本的 `c_login.php` 文件中发现了一个严重的 SQL 注入漏洞。该漏洞允许攻击者通过操纵登录请求中的参数,注入恶意 SQL 代码,从而可能获取敏感信息或对数据库进行未授权操作。建议用户尽快更新到最新版本并采取相应的安全措施以防止潜在的风险。 ... [详细]
  • 在 Linux 环境下,多线程编程是实现高效并发处理的重要技术。本文通过具体的实战案例,详细分析了多线程编程的关键技术和常见问题。文章首先介绍了多线程的基本概念和创建方法,然后通过实例代码展示了如何使用 pthreads 库进行线程同步和通信。此外,还探讨了多线程程序中的性能优化技巧和调试方法,为开发者提供了宝贵的实践经验。 ... [详细]
author-avatar
Hykun
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有