preface
As a course of the study of distributed system, MIT6.824 has always been one of the good quality of course at home and abroad, on zhihu on many experienced software developers are also pushing for this course, and its advantage is that lies in the professor’s lecture is combined with the students read the papers before class, class about your ideas, and the lab after class together, Students will largely be allowed to independently implement a programming model from zero to one, rather than just using the various vendor frameworks.
So I follow the course is 2020 spring semester of the course, course address is: pdos.csail.mit.edu/6.824/sched…
- Note: Actually Go is very easy to learn. I learned it for 4 days during my internship, and then entered the project in the next week. For those who cannot learn Go, please refer to the short course of Go Language Tour to get familiar with Go
Principle of graphs
If you haven’t read the MapReduce paper, it’s a good idea to do so so you can quickly understand the idea, and I’ll summarize it here.
First of all, MapReduce is a distributed programming model that can process and generate a large number of data sets. Its conclusion is that by providing two functions of Map and Reduce to program developers, the operation of generating (K, V) intermediate results and summarizing and merging data with the same key can be realized respectively. In this way, map and Reduce functions can actually be run on different machines, thus improving the efficiency of our calculation.
In the paper, the most important part is the following picture:
There are a few important parts to this picture:
-
The Master process
It is the role that assigns tasks to the worker process and determines whether the worker executes a Map or reduce function
-
Worker processes
If the worker process executes the map function, it needs to read the input file that has been fragmented, process each line of the file, and then convert the file into an intermediate file in the form of (k, V). If it is specified to perform reduce function, the worker needs to read the intermediate file (this process is implemented through RPC), sort the key of the intermediate file first, and then merge the result according to the sorting result.
In addition, pay attention to the number of intermediate files, which is specified by the user program, and how to choose which reduce program to execute using the hash(key) mod R formula.
The experiment
SRC \main\ Mrsequential. Go is the non-distributed version of the code that we need to implement, Specifically, we need to complete master.go, worker.go and rpc.go programs under SRC \ Mr.
First of all, I will start with worker. Each worker needs to communicate with the master through RPC to apply for a task (the specific map or Reduce task is decided by the master).
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string.string) []KeyValue.
reducef func(stringAnd []string) string) {
// Your worker implementation here.
// uncomment to send the Example RPC to the master.
// CallExample()
}
Copy the code
This is the Worker function in the homework, we also need to implement our Worker function in this function, so the idea is:
- to
master
The request task - Do different processing for the type of task returned
- Return task type is
map
: callMap
function - Return task type is
reduce
: callReduce
function - There are also two special cases, which are task wait and task terminate, respectively delayed for a period of time and direct return
- Exceptions are thrown by default
- Return task type is
So this is what the code looks like
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string.string) []KeyValue.
reducef func(stringAnd []string) string) {
// Your worker implementation here.
// uncomment to send the Example RPC to the master.
// CallExample()
for {
// Request task (RPC call)
taskInfo := CallTask()
switch taskInfo.State {
case 0:
Map(mapf, )
break;
case 1:
Reduce(reducef, )
break;
case 2:
time.sleep(time.Duration(time.Second * 10))
break;
case 3:
fmt.Println(" all of tasks have completed, nothing to do...")
return
default:
panic("Invaild State of Worker, Please try again....")}}}Copy the code
Notice that we use CallTask() to request a task from the master. In this function, we need to call the call() function provided to us. Finally, we return a taskInfo object, depending on the State in the object. So we need to implement the CallTask() method in worker.go:
func CallTask(a) *TaskInfo {
args := ExampleArgs{}
reply := TaskInfo{}
// Call Master's RequireTask method via RPC
call("Master.RequireTask", &args, &reply)
return &reply
}
func call(rpcname string, args interface{}, reply interface{}) bool {
/ / c, err: = RPC DialHTTP (" TCP ", "127.0.0.1" + ":" 1234)
// the masterSock() method is in rpc.go and returns the name of the socket in the UNIX domain
sockname := masterSock()
c, err := rpc.DialHTTP("unix", sockname)
iferr ! =nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
Copy the code
In addition to completing these two functions, we also need to define the contents of taskInfo, the taskInfo class, in rpc.go:
type TaskInfo struct {
/*** state value * 0 --> map 1 --> reduce 2 --> wait 3 --> nothing to do */
State int
// The file name to read
FileName string
// Which file to output to after map --> for map
FileIdx int
// Which file to write to --> for reduce
OutFileIdx int
// Divide into several reduces
ReduceNum int
FileNum int
}
Copy the code
After completing the above steps, we now need to return to the Worker method, and then we need to enter the switch and see that we still have two unimplemented functions, Map and Reduce: In the case of the former, in addition to a mapF function, the input parameter needs to be passed in the task information, because the task information contains the name and index of the file you want to read in.
func Map(mapf func(string.string) []KeyValue.taskInfo *TaskInfo) {
// Read the file according to the file name and save the kv output after calling mapf in the intermediate array
interFile := []KeyValue
file, err := os.Open(taskInfo.FileName)
iferr ! =nil {
panic(err)
}
content, err := ioutil.ReadAll(file)
iferr ! =nil {
panic(err)
}
file.Close()
kvs := mapf(taskInfo.FileName, string(content))
interFile = append(interFile, kvs...)
// Prepare the output file
outPrefix = "mr-tmp/mr-"
outPrefix += strconv.Itoa(taskInfo.FileIdx)
outFiles := make([]*os.File, taskInfo.ReduceNum)
outFilesEncode := make([]*json.Encoder, taskInfo.ReduceNum)
for idx := 0; idx < taskInfo.ReduceNum; idx++ {
// Generate empty temporary files in the MR-tmp folder and set the encoding
outFiles[idx], _ = ioutil.TempFile("mr-tmp"."mr-tmp-*")
outFilesEncode[idx] = json.NewEncoder(outFiles[idx])
}
// Route to different files according to different keys
for _, kv := range interFile {
outputIdx := ihash(kv.Key) % taskInfo.ReduceNum
file = outFiles[outputIdx]
}
// Save the file
for idx, file := range outFiles {
outName := outPrefix + strconv.Itoa(idx)
oldPath := filepath.Join(file.Name())
os.Rename(oldPath, outname)
file.Close()
}
// Tell the master that THE map is done
TaskDone(taskInfo)
}
Copy the code
func Reduce(reducef func(stringAnd []string) string.taskInfo *TaskInfo) {
outName := "mr-out-" + strconv.Itoa(taskInfo.OutFileIdx)
interPrefix := "mr-tmp/mr-"
interSuffix := "-" + strconv.Itoa(taskInfo.OutFileIdx)
interFile := []KeyValue{}
for idx := 0; idx < taskInfo.FileNum; idx++ {
inname := interPrefix + strconv.Itoa(idx) + interSuffix
file, err := os.Open(inname)
iferr ! =nil {
fmt.Printf("Open intermediate file %v failed: %v\n", inname, err)
panic("Open file error")
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
iferr := dec.Decode(&kv); err ! =nil {
break
}
interFile = append(interFile, kv)
}
file.Close()
}
sort.Sort(ByKey(interFile))
ofile, err := ioutil.TempFile("mr-tmp"."mr-*")
iferr ! =nil {
fmt.Printf("Create output file %v failed: %v\n", outname, err)
panic("Create file error")
}
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(interFile) && interFile[j].Key == interFile[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, interFile[k].Value)
}
output := reducef(interFile[i].Key, values)
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
os.Rename(filepath.Join(ofile.Name()), outName)
ofile.Close()
// acknowledge master
TaskDone(taskInfo)
}
Copy the code
Now that we’re done with worker.go, we need to consider the master.go structure.
Go /TaskInfo, but in Master, we have to manage each task, so we need a class to hold the state of the task:
type TaskStat struct{
fileName string
startTime time.Time
fileIndex int
outFileIndex int
reduceNum int
fileNum int
}
Copy the code
Then you define two classes that inherit the task information class
type MapTask struct {
TaskStat
}
type ReduceTask struct {
TaskStat
}
Copy the code
After we have the corresponding implementation class, we also need to define the external interface to provide the function of obtaining task information
type TaskInfoInterface interface {
GenerateTask() TaskInfo
OutOfTime() bool
GetFileIndex() int
GetOutFileIndex() int
SetTime()
}
Copy the code
All task classes implement the above methods
func (this *MapTask) GenerateTaskInfo(a) TaskInfo {
return TaskInfo {
State: 0
FileName: this.fileName
FileIdx: this.fileIndex
OutFileIdx: this.outFileIndex
ReduceNum: this.reduceNum
FileNum: this.fileNum
}
}
func (this *ReduceTask) GenerateTaskInfo(a) TaskInfo {
return TaskInfo {
State: 1
FileName: this.fileName
FileIdx: this.fileIndex
OutFileIdx: this.outFileIndex
ReduceNum: this.reduceNum
FileNum: this.fileNum
}
}
func (this *TaskStat) OutOfTime(a) bool {
return time.Now().Sub(this.startTime) > time.Duration(time, Second * 60)}func (this *TaskStat) GetFileIndex(a) int {
return this.fileIndex
}
func (this *TaskStat) GetOutFileIndex int {
return this.outFileIndex
}
func (this *TaskStat) SetTime(a) {
this.startTime = time.Now()
}
Copy the code
In addition, let’s define the master class and two methods requested by the worker:
type Master struct {
fileNames []string
mapRunningTaskChannel chan TaskStat
mapWaitingTaskChannel chan TaskStat
reduceRunningTaskChannel chan TaskStat
reduceWaitingTaskChannel chan TaskStat
isDone bool
reduceNum int
}
func (this *Master) RequireTask(args *ExampleArgs, reply *TaskInfo) error {
if this.isDone {
reply.State = 3
return nil
}
mapTask := <- mapWaitingTaskChannel
ifmapTask ! =nil {
mapTask.setTime()
mapRunningTaskChannel <- mapTask
*reply = mapTask.GenerateTaskInfo()
return nil
}
reduceTask := <- reduceWaitingTaskChannel
ifreduceTask ! =nil {
reduceTask.setTime()
reduceRunningTaskChannel <- reduceTask
*reply = reduceTask.GenerateTaskInfo()
return nil
}
if len(this.mapRunningTaskChannel) > 0 || len(this.reduceRunningTaskChannel) > 0 {
reply.State = 2
return nil
}
replr.State = 3
this.isDone = true
return nil
}
func (this *Master) TaskDone(args *TaskInfo, reple *ExampleReply) error {
switch args.State {
case 0:
mapTask := <- mapRunningTaskChannel
if len(this.mapRunningTaskChannel) == 0 && len(this.mapWaitingTaskChannel) == 0 {
this.distributeReduce()
}
break
case 1:
reduceTask := <- reduceRunningTaskChannel
break
default:
pannic("task error")}return nil
}
func (this *Master) distributeReduce(a) {
reduceTask := ReduceTaskStat{
TaskStat{
fileIndex: 0,
outFileIndex: 0,
ReduceNum: this.ReduceNum,
fileNum: len(this.filenames),
},
}
for reduceIndex := 0; reduceIndex < this.nReduce; reduceIndex++ {
task := reduceTask
task.partIndex = reduceIndex
reduceWaitingTaskChannel <- task
}
}
Copy the code
So so far, most of the experimental content I have finished writing, the main idea is to learn from zhuanlan.zhihu.com/p/260752052…
Reference connection:
zhuanlan.zhihu.com/p/260752052
zhuanlan.zhihu.com/p/54243727