作者:李新绿寧惠_330 | 来源:互联网 | 2023-09-08 03:59
环境指导书,认真看https:pdos.csail.mit.edu6.824labslab-mr.html要求golang1.15那就用那个版本我开始尝试使用1.1
环境
指导书,认真看 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
要求golang1.15 那就用那个版本
我开始尝试使用1.17发现gobuild不通过
在windows的goland上编代码, 在远程linux服务器(或者虚拟机)上执行。没有mac的可以尝试远程目录挂载。
windows挂载远程目录
实验是做完了,这里写一写过程,整理一下思路
过程
-
首先认真读了mapreduce的论文,因为以前本科也上过课,研一也上过,所以对于mr的过程还是算熟悉。
-
认真读指导书
-
按指导书跑一遍 sequencial-mr的例子
-
根据指导书和论文,需要做的事情是实现coordinator和worker的通信,(blacklive matters!)这里没用master了。coordinator里面用一些数据结构mutex或者channel控制不同worker的互斥访问,控制map任务和reduce任务的状态,map任务产生的中间文件的位置等。根据论文里面说的3种状态,Idle,inprogress,complete.这里我用了比较简单的数据结构。因为这个实验的输入文件都不是很大,不像gfs实验中需要split为16-64MB,所以直接一个map任务对应一个输入文件,因此将文件名作为了任务名,map的key。reduce任务名则根据mrcoordinator.go指定的nreduce数量决定,用数字代替。TaskStatus是枚举,三种任务状态。结构比较简单,刚开始想着要是数据结构不够用后面再加,但是好像做下来都没改过。
修改这三个文件
type Coordinator struct {mapTasks map[string]TaskStatusreduceTasks map[string]TaskStatusreduceFiles map[string][]string mapCompleted bool MapCompleteNum int nReduce int done boolmutex sync.Mutex
}
master初始化
func MakeCoordinator(files []string, nReduce int) *Coordinator {c :&#61; Coordinator{map[string]TaskStatus{},map[string]TaskStatus{},map[string][]string{},false,0,nReduce,false,sync.Mutex{},}for _, fileName :&#61; range files {c.mapTasks[fileName] &#61; Idle}for i :&#61; 0; i < nReduce; i&#43;&#43; {c.reduceTasks[strconv.Itoa(i)] &#61; Idlec.reduceFiles[strconv.Itoa(i)] &#61; []string{}}c.server()return &c
}
worker申请任务
他给的demo里面有example rpc调用的例子&#xff0c;在rpc里定义自己的请求体和响应体的结构&#xff0c;这也是rpc通信的风格。grpc开发app就是先定义protobuf消息格式。
我做的过程中定义的函数没有返回error&#xff0c;好像不行&#xff0c;需要保持和example一样。
coordinator&#xff08;master&#xff09;分配任务的同时&#xff0c;给对应的任务加一个过期时间&#xff0c;实验要求的是10s。
超时处理的时候&#xff0c;刚开始没看指导书的hints&#xff0c;后来发现里面建议使用ioutils.TempFile&#xff0c;因为测试过程中会测试当某个worker crash掉的时候&#xff0c;程序会怎么办&#xff0c;A worker超时了&#xff0c;分给另一个worker B&#xff0c;A写了一半的文件就失效了&#xff0c;因此应该用临时文件&#xff0c;最后返回的时候写入成功了才改成本应的文件名。
这里map的中间产物我命名很随意&#xff0c;直接文件名*n&#xff0c;n代表作为哪一个reduce的输入&#xff0c;n是由key 哈希后 取模得到的。
func (c *Coordinator) GetTask(request *ExampleArgs, reply *GetTaskReply) error {c.mutex.Lock()defer c.mutex.Unlock()if !c.mapCompleted { for fileName, status :&#61; range c.mapTasks {if status &#61;&#61; Idle {c.mapTasks[fileName] &#61; InProgressreply.MapName &#61; fileNamereply.NReduce &#61; c.nReducereply.TaskType &#61; MapTypego c.HandleMapTimeout(fileName)return nil}}reply.TaskType &#61; Sleep} else {for reduceName, status :&#61; range c.reduceTasks {if status &#61;&#61; Idle{c.reduceTasks[reduceName] &#61; InProgressreply.ReduceFiles &#61; c.reduceFiles[reduceName]reply.TaskType &#61; ReduceTypereply.ReduceName &#61; reduceNamego c.HandleReduceTimeout(reduceName)return nil}}reply.TaskType &#61; Sleep}return nil}
worker上报任务完成
map和reduce完成后同样rpc调用&#xff0c;报告master完成了任务&#xff0c;每次上报master都判断map阶段和reduce阶段是否完成&#xff0c;主程序中有一个判断整个程序是否完成的逻辑&#xff0c;done&#xff08;&#xff09;
func (c *Coordinator) ReduceReport(req *ReduceCompleteReq,reply *ExampleReply) error{c.mutex.Lock()defer c.mutex.Unlock()c.reduceTasks[req.ReduceName] &#61; Completedfmt.Fprintf(os.Stderr, "####reduce完成&#xff0c; "&#43;req.ReduceName&#43;"############### \n")done :&#61; truefor _,status :&#61; range c.reduceTasks{if status !&#61; Completed{done &#61; falsebreak}}c.done &#61; donereturn nil
}
差不多就这些&#xff0c;实现进程同步还可以用channel
遇到的两个坑就是&#xff0c;一个rpc调用函数需要返回error&#xff0c;另一个就是crash测试&#xff0c;如果不做临时文件处理&#xff0c;就可能通不过。