preface

As a course of the study of distributed system, MIT6.824 has always been one of the good quality of course at home and abroad, on zhihu on many experienced software developers are also pushing for this course, and its advantage is that lies in the professor’s lecture is combined with the students read the papers before class, class about your ideas, and the lab after class together, Students will largely be allowed to independently implement a programming model from zero to one, rather than just using the various vendor frameworks.

So I follow the course is 2020 spring semester of the course, course address is: pdos.csail.mit.edu/6.824/sched…

  • Note: Actually Go is very easy to learn. I learned it for 4 days during my internship, and then entered the project in the next week. For those who cannot learn Go, please refer to the short course of Go Language Tour to get familiar with Go

Principle of graphs

If you haven’t read the MapReduce paper, it’s a good idea to do so so you can quickly understand the idea, and I’ll summarize it here.

First of all, MapReduce is a distributed programming model that can process and generate a large number of data sets. Its conclusion is that by providing two functions of Map and Reduce to program developers, the operation of generating (K, V) intermediate results and summarizing and merging data with the same key can be realized respectively. In this way, map and Reduce functions can actually be run on different machines, thus improving the efficiency of our calculation.

In the paper, the most important part is the following picture:

There are a few important parts to this picture:

  • The Master process

    It is the role that assigns tasks to the worker process and determines whether the worker executes a Map or reduce function

  • Worker processes

    If the worker process executes the map function, it needs to read the input file that has been fragmented, process each line of the file, and then convert the file into an intermediate file in the form of (k, V). If it is specified to perform reduce function, the worker needs to read the intermediate file (this process is implemented through RPC), sort the key of the intermediate file first, and then merge the result according to the sorting result.

In addition, pay attention to the number of intermediate files, which is specified by the user program, and how to choose which reduce program to execute using the hash(key) mod R formula.


The experiment

SRC \main\ Mrsequential. Go is the non-distributed version of the code that we need to implement, Specifically, we need to complete master.go, worker.go and rpc.go programs under SRC \ Mr.

First of all, I will start with worker. Each worker needs to communicate with the master through RPC to apply for a task (the specific map or Reduce task is decided by the master).

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string.string) []KeyValue.
	reducef func(stringAnd []string) string) {

	// Your worker implementation here.

	// uncomment to send the Example RPC to the master.
	// CallExample()

}
Copy the code

This is the Worker function in the homework, we also need to implement our Worker function in this function, so the idea is:

  • tomasterThe request task
  • Do different processing for the type of task returned
    • Return task type ismap: callMapfunction
    • Return task type isreduce: callReducefunction
    • There are also two special cases, which are task wait and task terminate, respectively delayed for a period of time and direct return
    • Exceptions are thrown by default

So this is what the code looks like

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string.string) []KeyValue.
	reducef func(stringAnd []string) string) {

	// Your worker implementation here.

	// uncomment to send the Example RPC to the master.
	// CallExample()
    for {
        // Request task (RPC call)
        taskInfo := CallTask()
        switch taskInfo.State {
            case 0:
            	Map(mapf, )
            	break;
            case 1:
            	Reduce(reducef, )
            	break;
            case 2:
            	time.sleep(time.Duration(time.Second * 10))
            	break;
        	case 3:
            	fmt.Println(" all of tasks have completed, nothing to do...")
            	return
            default:
            	panic("Invaild State of Worker, Please try again....")}}}Copy the code

Notice that we use CallTask() to request a task from the master. In this function, we need to call the call() function provided to us. Finally, we return a taskInfo object, depending on the State in the object. So we need to implement the CallTask() method in worker.go:

func CallTask(a) *TaskInfo {
    args := ExampleArgs{}
    reply := TaskInfo{}
    // Call Master's RequireTask method via RPC
    call("Master.RequireTask", &args, &reply)
    return &reply
}

func call(rpcname string, args interface{}, reply interface{}) bool {
	/ / c, err: = RPC DialHTTP (" TCP ", "127.0.0.1" + ":" 1234)
    // the masterSock() method is in rpc.go and returns the name of the socket in the UNIX domain
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	iferr ! =nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}
Copy the code

In addition to completing these two functions, we also need to define the contents of taskInfo, the taskInfo class, in rpc.go:

type TaskInfo struct {
    /*** state value * 0 --> map 1 --> reduce 2 --> wait 3 --> nothing to do */
    State        int
    // The file name to read
    FileName     string
    // Which file to output to after map --> for map
    FileIdx      int 
    // Which file to write to --> for reduce
    OutFileIdx   int
    // Divide into several reduces
    ReduceNum    int
    FileNum      int
}
Copy the code

After completing the above steps, we now need to return to the Worker method, and then we need to enter the switch and see that we still have two unimplemented functions, Map and Reduce: In the case of the former, in addition to a mapF function, the input parameter needs to be passed in the task information, because the task information contains the name and index of the file you want to read in.

func Map(mapf func(string.string) []KeyValue.taskInfo *TaskInfo) {
    // Read the file according to the file name and save the kv output after calling mapf in the intermediate array
    interFile := []KeyValue
    file, err := os.Open(taskInfo.FileName)
    iferr ! =nil {
        panic(err)
    }
    content, err := ioutil.ReadAll(file)
    iferr ! =nil {
        panic(err)
    }
    file.Close()
    kvs := mapf(taskInfo.FileName, string(content))
    interFile = append(interFile, kvs...)
    
    // Prepare the output file
    outPrefix = "mr-tmp/mr-"
    outPrefix += strconv.Itoa(taskInfo.FileIdx)
    outFiles := make([]*os.File, taskInfo.ReduceNum)
    outFilesEncode := make([]*json.Encoder, taskInfo.ReduceNum)
    for idx := 0; idx < taskInfo.ReduceNum; idx++ {
        // Generate empty temporary files in the MR-tmp folder and set the encoding
        outFiles[idx], _ = ioutil.TempFile("mr-tmp"."mr-tmp-*")
        outFilesEncode[idx] = json.NewEncoder(outFiles[idx])
    }
    
    // Route to different files according to different keys
    for _, kv := range interFile {
        outputIdx := ihash(kv.Key) % taskInfo.ReduceNum
        file = outFiles[outputIdx]
    }
    
    // Save the file
    for idx, file := range outFiles {
        outName := outPrefix + strconv.Itoa(idx)
        oldPath := filepath.Join(file.Name())
        os.Rename(oldPath, outname)
        file.Close()
    }
    
    // Tell the master that THE map is done
    TaskDone(taskInfo)
}
Copy the code
func Reduce(reducef func(stringAnd []string) string.taskInfo *TaskInfo) {
    outName := "mr-out-" + strconv.Itoa(taskInfo.OutFileIdx)
    interPrefix := "mr-tmp/mr-"
    interSuffix := "-" + strconv.Itoa(taskInfo.OutFileIdx)
    
    interFile := []KeyValue{}
    for idx := 0; idx < taskInfo.FileNum; idx++ {
        inname := interPrefix + strconv.Itoa(idx) + interSuffix
		file, err := os.Open(inname)
		iferr ! =nil {
			fmt.Printf("Open intermediate file %v failed: %v\n", inname, err)
			panic("Open file error")
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			iferr := dec.Decode(&kv); err ! =nil {
				break
			}
			
			interFile = append(interFile, kv)
		}
		file.Close()
    }
    
    sort.Sort(ByKey(interFile))
    
    ofile, err := ioutil.TempFile("mr-tmp"."mr-*")
	iferr ! =nil {
		fmt.Printf("Create output file %v failed: %v\n", outname, err)
		panic("Create file error")
	}
	
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(interFile) && interFile[j].Key == interFile[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, interFile[k].Value)
		}
		output := reducef(interFile[i].Key, values)

		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}
	os.Rename(filepath.Join(ofile.Name()), outName)
	ofile.Close()
	// acknowledge master
	TaskDone(taskInfo)
} 
Copy the code

Now that we’re done with worker.go, we need to consider the master.go structure.

Go /TaskInfo, but in Master, we have to manage each task, so we need a class to hold the state of the task:

type TaskStat struct{
    fileName       string
    startTime      time.Time
    fileIndex      int
    outFileIndex int
    reduceNum      int
    fileNum        int
}
Copy the code

Then you define two classes that inherit the task information class

type MapTask struct {
    TaskStat
} 

type ReduceTask struct {
    TaskStat
}
Copy the code

After we have the corresponding implementation class, we also need to define the external interface to provide the function of obtaining task information

type TaskInfoInterface interface {
    GenerateTask()      TaskInfo
    OutOfTime()         bool
    GetFileIndex()      int
    GetOutFileIndex()   int
    SetTime()
}
Copy the code

All task classes implement the above methods

func (this *MapTask) GenerateTaskInfo(a) TaskInfo {
    return TaskInfo {
        State: 0
        FileName:   this.fileName
        FileIdx:    this.fileIndex
        OutFileIdx: this.outFileIndex
        ReduceNum:  this.reduceNum
        FileNum:    this.fileNum
    }
}

func (this *ReduceTask) GenerateTaskInfo(a) TaskInfo {
    return TaskInfo {
        State: 1
        FileName:   this.fileName
        FileIdx:    this.fileIndex
        OutFileIdx: this.outFileIndex
        ReduceNum:  this.reduceNum
        FileNum:    this.fileNum
    }
}

func (this *TaskStat) OutOfTime(a) bool {
    return time.Now().Sub(this.startTime) > time.Duration(time, Second * 60)}func (this *TaskStat) GetFileIndex(a) int {
    return this.fileIndex
}

func (this *TaskStat) GetOutFileIndex int {
    return this.outFileIndex
}

func (this *TaskStat) SetTime(a) {
    this.startTime = time.Now()
}
Copy the code

In addition, let’s define the master class and two methods requested by the worker:

type Master struct {
    fileNames []string
    
    mapRunningTaskChannel chan TaskStat
    mapWaitingTaskChannel chan TaskStat
    reduceRunningTaskChannel chan TaskStat
    reduceWaitingTaskChannel chan TaskStat
    
    isDone bool
    reduceNum int
}

func (this *Master) RequireTask(args *ExampleArgs, reply *TaskInfo) error {
    if this.isDone {
        reply.State = 3
        return nil
    }
    
    mapTask := <- mapWaitingTaskChannel
        
    ifmapTask ! =nil {
        mapTask.setTime()
        mapRunningTaskChannel <- mapTask
        *reply = mapTask.GenerateTaskInfo()
        return nil
    }

    reduceTask := <- reduceWaitingTaskChannel
    ifreduceTask ! =nil {
        reduceTask.setTime()
        reduceRunningTaskChannel <- reduceTask
        *reply = reduceTask.GenerateTaskInfo()
        return nil
    }

    if len(this.mapRunningTaskChannel) > 0 || len(this.reduceRunningTaskChannel) > 0 {
        reply.State = 2
        return nil
    }

    replr.State = 3
    this.isDone = true
    return nil
   
}

func (this *Master) TaskDone(args *TaskInfo, reple *ExampleReply)  error {
    switch args.State {
        case 0:
            mapTask := <- mapRunningTaskChannel
            if len(this.mapRunningTaskChannel) == 0 && len(this.mapWaitingTaskChannel) == 0 {
                this.distributeReduce()
            }
            break
    	case 1:
        	reduceTask := <- reduceRunningTaskChannel
        	break
        default:
        	pannic("task error")}return nil
}

func (this *Master) distributeReduce(a) {
	reduceTask := ReduceTaskStat{
		TaskStat{
			fileIndex: 0,
			outFileIndex: 0,
			ReduceNum:   this.ReduceNum,
			fileNum:    len(this.filenames),
		},
	}
	for reduceIndex := 0; reduceIndex < this.nReduce; reduceIndex++ {
		task := reduceTask
		task.partIndex = reduceIndex
		reduceWaitingTaskChannel <- task
	}
}
Copy the code

So so far, most of the experimental content I have finished writing, the main idea is to learn from zhuanlan.zhihu.com/p/260752052…


Reference connection:

zhuanlan.zhihu.com/p/260752052

zhuanlan.zhihu.com/p/54243727