1.MapReduce paper reading

1.1 What is MapReduce

Graphs papers address: pdos.csail.mit.edu/6.824/paper…

The author came into contact with MapReduce for the first time when learning the distributed system course of MIT6.824. At first glance, it feels like the idea of divide and conquer.

To take a simple example, as a fan of Manchester United, I wanted to know the history of Manchester United, so I collected relevant news reports of Manchester United in the past 30 years to know who was the most topical soccer star, and then counted the frequency of each soccer star’s name appearing in relevant news reports. But if I had to report one report after another, it would have been dazzling, so I decided to invite my friends to learn about the history of football.

Let’s say I invite N friends, and I divide the 30 years of news stories into N pieces for these N people, so that when each person counts up his or her own news stories, we get N tables, and each table lists the number of times each player’s name appears. Then I invited M friends, who each counted the total number of appearances of the name of A football star in these N tables. For example, A counted the number of appearances of Beckham’s name in N tables, B counted the number of appearances of Lu Xiaopang in N tables, etc. The result is a table showing the frequency of Manchester United stars’ names over the past 30 years.

From the example above we can see that if you want MapReduce, you need to have enough friends (just kidding). Let’s look at MapReduce from a theoretical level. MapReduce is precisely a programming model.

In the example above, we can write a program to traverse the news report in the actual scenario, or we can run the program to traverse the news report in multiple threads, but this is always too slow due to the CPU capacity limitation. So we deploy on N machines, and naturally we have to write a program to deploy on M machines to integrate the output from N machines. Since it’s all such a hassle, let MapReduce do it for us!

MapReduce abstracts things we don’t want to think about when we write programs, like how to split input data, how to schedule on a cluster, how to handle exceptions and errors between machines and hardware, and how to communicate between machines. It hides the details of parallel implementation, fault tolerance, data distribution, load balancing, and so on. In this way, we only need to implement the program in the standalone scenario, and how to split the input file, and how to do integration, these do not need to consider.

1.2 Programming Model

As you can see in the example above, we have an input set (news stories), an output set (tabular data on the frequency of name occurrences), and an intermediate result set (tabular data for each of N friends). The process from the input set to the intermediate result set is a MAP process, while the process from the intermediate result set to the output set is a Reduce process.

We treat the input set as a set of key/value pairs, where key is the name of each news story document and value is the content of each news story. Similarly, the intermediate result set is also a set of key/value pairs. After the map, the key becomes the name of the star, and the value is the number of occurrences of the name of the star. Since it is the output of N intermediate result sets, for N intermediate result sets, the value corresponding to key will become list (value), i.e., a list of values. When the value list is entered into reduce, a result set of values is obtained, including the number of occurrences of each player. To sum it up:

Input1 -> Map -> a,1 b,1 c,1 Input2 -> Map -> b,1 Input3 -> Map -> a,1 c,1 | | | | | -> Reduce -> c,2 | -----> Reduce -> B,2 ---------> Reduce -> a,2 map(k1,v1) ->list(k2,v2) Reduce (k2,list(v2)) ->list(v2)Copy the code

1.3 Framework Implementation

The above is a schematic diagram of the execution process of MapReduce. UserPraogram is a program written by our users, and master can be understood as a scheduler. From left to right are the input set, Map operation, intermediate result set, Reduce operation, and output set. Serial numbers (1) to (6) in the figure show the whole process of MapReduce scheduling.

(0) First, the partition function of the MapReduce library in the user program will divide the input file set into N pieces (the partition function and the value of N can be specified by the user program), with each size ranging from 16MB to 64MB (again, the optional parameter controls the size).

(1) (2) Then a large number of user programs will be copied. One of the copied programs is master, and the other programs are worker. The master is the foreman, who will assign N Map tasks and M Reduce tasks to workers.

(3) The worker reads one or more fragments in the partitioned file set, analyzes the key/value pair from the input, acts as the input parameter of the map function in the user program, outputs the intermediate result set and stores it in the memory.

(4) The intermediate result set cached in memory is periodically stored on the local disk and written into M regions through the partition function. The locations of these M regions will be transmitted to the master, and the master will tell the information to the worker assigned a Reduce task.

(5) Workers assigned reduce will RPC read the intermediate result set and aggregate the contents with the same key in a sorting manner.

(6) After sorting, each unique key and the corresponding value set (list(value)) in the intermediate result set will be passed to the reduce function of the user program as input parameters, and finally output to the output file set.

1.4 Fault-tolerant processing

MapReduce is built for large scale. By arranging hundreds of workers to process large amounts of data, it is inevitable that workers will “strike” (the server crashes or goes down), or the master will go wrong due to huge administrative pressure. These are all questions to consider when designing MapReduce.

1.4.1 worker fault

In the running process, since the master assigns tasks to workers, it will want to know the working conditions of workers, so the master will ping each worker periodically. If no feedback from the worker is received within a period of time, the worker will be judged to be invalid. The tasks previously assigned to this worker are judged to be unfinished and set to the initial idle state, which can be assigned to other workers to deal with.

Assuming that there are worker A and worker B, worker A generally crashes after working, and worker A’s task is taken over by worker B. At this point, the intermediate result set generated by worker A’s map task is stored locally in worker A, but since Worker A has crashed and cannot be accessed, worker B needs to re-execute the map task performed by worker A to generate the intermediate result set. And inform other workers executing reduce to stop reading the data of worker A and read the data of worker B; However, if it is worker A’s reduce task, the output set generated has been stored in the global file system, so it does not need to be executed again.

1.4.2 master fault

If the master checkpoint fails, it can be checked periodically. That is, if the current master checkpoint fails, the last checkpoint can be read to start another master process. Of course, there is only one master. If the master fails, the client needs to be informed that the master is unavailable, and MapReduce stops. The client can execute mapReduce again based on the situation.

1.5 storage

In the above implementation, intermediate results and output sets need to be stored. In a large cluster, network bandwidth is scarce. To prevent network bandwidth from becoming a performance bottleneck, local disks are usually used to save network bandwidth resources. MapReduce is based on the Google File System (GFS) File System. The basic implementation of MapReduce is to divide files into 64MB blocks. Copies of each block are stored on different machines to ensure data security. When the MapReduce worker fails to process a task, the master arranges a nearby worker machine to execute the task, ensuring less resource consumption.

2. Write MapReduce

MIT6.824 Lab1 graphs address: pdos.csail.mit.edu/6.824/labs/…

2.1 Environment Construction

2.1.1 Linux environment

The environment used is Windows System for Linux (WSL), Ubuntu16.04 and Xshell.

WSL configuration and use of reference articles: zhuanlan.zhihu.com/p/90173113

Tie-in use xshell reference articles: www.jianshu.com/p/039411d2c…

If xshell fails to connect to Ubuntu after the restart, restart SSH and try again, that is, start Ubuntu and enter

sudo service ssh --full-restart
Copy the code

2.1.2 GO Language Environment

Download Golang from golang.org/dl/

1) Open the official website and download the corresponding version. I downloaded 1.14.4 here

2) Create a go folder under Ubunut ~ and download the package

The mkdir ~ / CD ~ / go go wget HTTP: / / https://dl.google.com/go/go1.14.4.linux-amd64.tar.gzCopy the code

3) Decompress the package to the /usr/local directory

Tar -c /usr/local-zxvf go1.14.4.linux-amd64.tar.gzCopy the code

4) Add the /usr/loac/go/bin directory to the PATH variable and add it to /etc/profile, which is configured for each user in the system, or $HOME/. Profile, which is configured for the current user only, by default.

vim ~/.bashrc
#Add on the last line
export GOROOT=/usr/local/go
export PATH=$PATH:$GOROOT/bin
#Save the configuration file and exit to take effect
source ~/.bashrc
Copy the code

2.2 WordCount — The first MapReduce program

According to the basic understanding of MapReduce, a MapReduce program consists of three parts: the Map program, the Reduce program, and the program that schedules MapReduce.

We’ll start writing our first MapReduce program with the code examples provided by the MIT6.824 course.

2.2.1 Example code — serial implementation

1) We can clone the sample code first

$ git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824
$ cd 6.824
Copy the code

2) Look at the directory structure

SRC ├─ MrApps │ ├─ ├─ go │ ├─ ├.go │ ├.go │ ├ _... ├ ─ the main │ ├ ─ mrsequential. Go │ ├ ─ mrmaster. Go | ├ ─ mrworker. Go | ├ ─ pg *. TXT | └ ─... ├ ─ Mr ├ ─ master. Go ├ ─ worker. Go └ ─ RPC. GoCopy the code
  • Mrapps: includes some sample code for already written map and reduce programs, such as Wc. go, which is a counting program, and indexer.go, which is a text indexer program
  • Main: mrsequential. Go, a program that sequentially schedules MapReduce, and some input data pg*.go
  • Mr: We include Mster. go, worker.go, rpc.go, which is the parallel version of mapReduce scheduler that we need to implement

3) Try executing the serial version of the scheduler implementation counter

$ cd~ / 6.824
$ cd src/main
$go build -buildmode=plugin .. /mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt
$ more mr-out-0// go build -buildmode=plugin .. The /mrapps/wc.go section will build a plug-in wc.so for the mrsequential. Go call. // go run mrsequential. Go wc.so pg*. TXT indicates to compile and run mrsequential. Where Wc.so is the plug-in and PG * is the input data set. // more mr-out-0 you can view the output data set, where mr-out-0 is output data.Copy the code

Ps: For a quick look at go, visit tour.go-zh.org/list. The site is a simple go guide that takes about two hours to get a feel for.

4) After executing the above statement, you must be curious how the code level is implemented! Let’s take a look at the implementation of map and Reduce in the WC. go counting program.

// wc.go

/ / the map program
// Map executes once for each input file
// The first argument is the name of the input file, and the second argument is the full content of the input file
// The output is a sliced collection of key/value pairs (equivalent to an array, where each object is a key/value pair)
func Map(filename string, contents string) []先生.KeyValue {
    // Define a method to determine if the input r is an alphabetic character (this can be used to split words in the text and returns false if Spaces are encountered)
    // The input is a rune data type (equivalent to INT32, commonly used to handle Unicode or UTF-8 characters)
    // Return a Boolean value, false if r is an alphanumeric character, true otherwise
    ff := func(r rune) bool { return! unicode.IsLetter(r) }// Cut the content of the article into a word array
    // For example "hello world" will be split into ["hello" "world"]
    // FieldsFunc is a function that splits characters according to custom rules
    words := strings.FieldsFunc(contents, ff)
    
    // Generate a set of key/value pairs, where key is the word in the article and value is 1
    kva := []mr.KeyValue{}
    for _, w := range words {
        kv := mr.KeyValue{w, "1"}
        kva = append(kva, kv)
    }
    return kva
}
/ / the reduce application
// The set of intermediate results generated by all map tasks is executed once as input arguments to reduce
// Key is a word, values is a set of key occurrences, each value is 1, so the size of values is the number of occurrences of the word key
// Please refer to the following figure:
// Input1 -> Map -> a,1 b,1 c,1
// Input2 -> Map -> b,1
// Input3 -> Map -> a,1 c,1
// | | |
// | | -> Reduce -> c,2
// | -----> Reduce -> b,2
// ---------> Reduce -> a,2
func Reduce(key string, values []string) string {
    // The strconv function converts an int to a string
    return strconv.Itoa(len(values))
}

Copy the code

Next, look at the Mrsequential. Go scheduler

// mrsequential.go

// ByKey is used to sort intermediate result sets
// The same keys are sorted together, which can be used as reduce input parameters
type ByKey []mr.KeyValue

func (a ByKey) Len(a) int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func main(a) {
    
    // If the number of arguments is less than 3, an error is reported and the program exits
    // At go run mrsequential.. / mrapps/wc. So pg *. TXT
    // Argument 0: mrsequential. Go
    // Parameter 1: wc.so
    // parameter 2: pg*.txt
    if len(os.Args) < 3 {
        fmt.Fprintf(os.Stderr, "Usage: mrsequential .. /mrapps/xxx.so inputfiles... \n")
	    os.Exit(1)}/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the map task start -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
    // Read map and reduce programs from parameter 1 (wc.so)
    mapf, reducef := loadPlugin(os.Args[1])
    
    // Read each file as an entry to the map program and print intermediate results
    intermediate := []mr.KeyValue{}
    for _, filename := range os.Args[2:] {
        // Open the file
        file, err := os.Open(filename)
        iferr ! =nil {
            log.Fatalf("cannot open %v", filename)
        }
        // Read the contents of the file
        content, err := ioutil.ReadAll(file)
        iferr ! =nil {
            log.Fatalf("cannot read %v", filename)
        }
        file.Close()
        // Enter the map program
        kva := mapf(filename, string(content))
        // Intermediate result
        intermediate = append(intermediate, kva...)
    }
    / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the map task end -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -

    // Sort the intermediate results
    sort.Sort(ByKey(intermediate))
    
    // Create the output file mr-out-0
    oname := "mr-out-0"
    ofile, _ := os.Create(oname)
    
    / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the reduce task start -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
    // Since the intermediate result set is ordered, the same key/value pairs are placed consecutively together
    // Just take the intermediate result set with the same key as input to reduce
    i := 0
    for i < len(intermediate) {
        // I indicates the position of the first word with the same key
        // j represents the last digit of the last word with the same key
        j := i + 1
        for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
            j++
        }
        values := []string{}
        // Iterate over the key/value pairs from I to j, all with the same key and value 1
        // as an input parameter to reduce
        for k := i; k < j; k++ {
            values = append(values, intermediate[k].Value)
        }
        output := reducef(intermediate[i].Key, values)

        // Output reduce results to the MR-OUT -0 file
        fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

        i = j
	}
    / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- reduce task end -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -

    ofile.Close()
}

// Load the method in the plug-in
// Enter the file name of the plug-in
// Return map method and reduce method
func loadPlugin(filename string) (func(string.string) []先生.KeyValue.func(stringAnd []string) string) {
    p, err := plugin.Open(filename)
    iferr ! =nil {
        log.Fatalf("cannot load plugin %v", filename)
    }
    // Find the Map method in the plug-in and assign it to mapf
    xmapf, err := p.Lookup("Map")
    iferr ! =nil {
        log.Fatalf("cannot find Map in %v", filename)
    }
    mapf := xmapf.(func(string.string) []先生.KeyValue)
    
    // Find the Reduce method in the plug-in and assign it to reducef
    xreducef, err := p.Lookup("Reduce")
    iferr ! =nil {
        log.Fatalf("cannot find Reduce in %v", filename)
    }
    reducef := xreducef.(func(stringAnd []string) string)
    
    // Return map and reduce methods
    return mapf, reducef
}

Copy the code

2.2.2 Distributed scenario implementation — Master and worker implementation

As you can see, the code above is a simple serial implementation. However, the power of MapReduce lies in the implementation of master and worker in distributed scenarios. Next, we will demonstrate the implementation of master and worker.

The interactive demonstration is as follows:

You can see that the basic idea is:

  • The worker and master interact with each other through RPC data transmission. The interactions are as follows:
    • ReqTask < – > HandleTaskReq: The worker requests a task to the master, which should include ** task stage (Map/Reduce), task number, input file name, total number of input files, total number of reduce processing (the intermediate result set should be divided into several Reduce tasks for processing) ** and other information.
    • ReportTask <–>HandleTask: The worker reports the task processing status to the master after processing the Map/Reduce task. The returned information only needs to include the task number and completion flag.
  • The master starts with a series of initialization operations:
    • Example Initialize the task information.
    • Put the task into the task queue.
    • Enable RPC listening.
  • The master will monitor the WORKER’s RPC request and process it accordingly:
    • HandleTaskReq: Handles the WORKER’s request task RPC call, which takes the task from the task queue and sends it to the worker. In addition, the local record that the task status is in execution and the time when the task starts to execute is used to put the task back into the task queue when the worker processes timeout.
    • HandleTaskReport: Process worker’s REPORT task status RPC call and judge whether the task is completed. If it is completed, the local record task status will be changed to completed; otherwise, the task will be put back into the task queue and the local record task status will be changed to queue.
  • The master needs to have a scheduled task to check the running status of the task, including:
    • Check whether the task times out. If the task times out, add the task to the task queue again.
    • Check whether all tasks in the Map phase are completed. If yes, enter the Reduce phase and initialize the Reduce phase.
    • Check whether all tasks in the Reduce phase are completed. If yes, no further action is required.

2.2.3 Specific code implementation

Let’s start writing the program, which includes several program files, respectively:

  • / Mr /rpc.go — The definition of the file that RPC requests to transfer

  • /main/mrmaster.go — master starts the program

  • /main/mrworker.go — worker starts the program

  • / Mr /master.go — implementation of master

  • / Mr /worker.go — implementation of worker

Firstly, the RPC request structure needs to be defined. Here, we need to define three parts: task message, request task message and report task message, which are defined as follows:

// rpc.go

type TaskStatus string
type TaskPhase string
type TimeDuration time.Duration

// Task status constant
const (
    / / ready
	TaskStatusReady   TaskStatus = "ready"
	/ / in the queue
    TaskStatusQueue   TaskStatus = "queue"
    / / implementation
	TaskStatusRunning TaskStatus = "running"
	/ / has been completed
    TaskStatusFinish  TaskStatus = "finish"
	// Task error
    TaskStatusErr     TaskStatus = "error"
)

// Task phase constants
const (
	MapPhase    TaskPhase = "map"
	ReducePhase TaskPhase = "reduce"
)

// Task definition
type Task struct {
	// Operation phase: Map /reduce
	TaskPhase TaskPhase
	/ / number of the map
	MapNum int
	/ / reduce the number
	ReduceNum int
	// Task number
	TaskIndex int
	/ / file name
	FileName string
	// Whether to complete
	IsDone bool
}

// Request task parameters
type ReqTaskArgs struct {
	// The current worker is alive and can execute the task
	WorkerStatus bool
}

// Request the task return value
type ReqTaskReply struct {
	// Return a task
	Task Task
	// Complete all tasks
	TaskDone bool
}

// Report task parameters
type ReportTaskArgs struct {
	// The current worker is alive and can execute the task
	WorkerStatus bool
	// Task number
	TaskIndex int
	// Whether to complete
	IsDone bool
}

// Report the task return value
type ReportTaskReply struct {
	// Check whether the master response is processed successfully
	MasterAck bool
}

Copy the code

Mrmaster and mrworker are initiators of master and worker respectively:

  • Mrmaster will call/mr/master.gotheMakeMaster()To start the master, input the file name collection andnReduce(Indicates that the map phase should split intermediate keys tonReduceOne reduce job). Start the post-loop call for masterDone()Determine whether the task is complete.
  • Mrworker loads map and Reduce functions passed inWorker()To start the worker.

Master implementation idea:

  • Define the structure of the master
    • Task queue, used to control requests for tasks
    • Record a collection of input files
    • Maintains the number of Map/Reduce operations
    • Task stage: indicates the task progress stage
    • Task status, used to record the global task status
    • Mutex, used for resource locking
  • Understand the implementation of RPC in Go: by definitionfunc (m *Master) method(args *Args, reply *Reply) error To implement, when listening to the RPC request, according to the method name into the corresponding method. Implement the task request and task report handler function.
  • inDone()Function to realize the judgment of different scenarios, it should be noted that the switch in Go language does not need to fill in the break, the default will break.

Worker implementation idea:

  • Implement the main thread, loop request task, execute task, report task.
  • Implement RPC calls that request and execute tasks.
  • To implement the map task execution program, output it to nReduce files based on key values. The file name format ismr-x-yX is the MAP task id, and y isihash(key)%nreduceAfter the value. In addition, intermediate files can be stored in JSON format.
  • Implement the reduce task execution program, which is similar to map task execution. After reading file contents, sort key values, and execute the Reduce function.

The core implementation is as follows:

// master.go

// Task status definition
type TaskState struct {
	/ / state
	Status TaskStatus
	// Start execution time
	StartTime time.Time
}

// Master structure definition
type Master struct {
	// Task queue
	TaskChan chan Task
	// Enter the file
	Files []string
	/ / number of the map
	MapNum int
	/ / reduce the number
	ReduceNum int
	// Task phase
	TaskPhase TaskPhase
	// Task status
	TaskState []TaskState
	/ / the mutex
	Mutex sync.Mutex
	// Whether to complete
	IsDone bool
}

/ / start the Master
func MakeMaster(files []string, nReduce int) *Master {
	m := Master{}

	// Initialize the Master
	m.IsDone = false
	m.Files = files
	m.MapNum = len(files)
	m.ReduceNum = nReduce
	m.TaskPhase = MapPhase
	m.TaskState = make([]TaskState, m.MapNum)
	m.TaskChan = make(chan Task, 10)
	for k := range m.TaskState {
		m.TaskState[k].Status = TaskStatusReady
	}

	// Enable thread listening
	m.server()

	return &m
}

// Start a thread to listen for worker.go's RPC requests
func (m *Master) server(a) {
	rpc.Register(m)
	rpc.HandleHTTP()
	//l, e := net.listen (" TCP ", "127.0.0.1:1234") //l, e := net.listen (" TCP ", "127.0.0.1:1234")
	os.Remove("mr-socket")
	l, e := net.Listen("unix"."mr-socket")
	ife ! =nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)}// Process the task request
func (m *Master) HandleTaskReq(args *ReqTaskArgs, reply *ReqTaskReply) error {
	fmt.Println("Start processing task requests...")
	if! args.WorkerStatus {return errors.New("The current worker is offline")}// The task is queued
	task, ok := <-m.TaskChan
	if ok == true {
		reply.Task = task
		// The task status is set to executing
		m.TaskState[task.TaskIndex].Status = TaskStatusRunning
		// Record the start time of the task
		m.TaskState[task.TaskIndex].StartTime = time.Now()
	} else {
		// If there are no tasks in the queue, all tasks are complete
		reply.TaskDone = true
	}
	return nil
}

// Process task reports
func (m *Master) HandleTaskReport(args *ReportTaskArgs, reply *ReportTaskReply) error {
	fmt.Println("Start processing mission reports...")
	if! args.WorkerStatus { reply.MasterAck =false
		return errors.New("The current worker is offline")}if args.IsDone == true {
		// The task is complete
		m.TaskState[args.TaskIndex].Status = TaskStatusFinish
	} else {
		// Task execution error
		m.TaskState[args.TaskIndex].Status = TaskStatusErr
	}
	reply.MasterAck = true
	return nil
}

// Loop Done() to determine if the task is complete
func (m *Master) Done(a) bool {
	ret := false

	finished := true
	m.Mutex.Lock()
	defer m.Mutex.Unlock()
	for key, ts := range m.TaskState {
		switch ts.Status {
		case TaskStatusReady:
			// The task is ready
			finished = false
			m.addTask(key)
		case TaskStatusQueue:
			// In the task queue
			finished = false
		case TaskStatusRunning:
			// The task is in progress
			finished = false
			m.checkTask(key)
		case TaskStatusFinish:
			// The task is complete
		case TaskStatusErr:
			// Task error
			finished = false
			m.addTask(key)
		default:
			panic("Abnormal task status...")}}// Task completed
	if finished {
		// Judgment phase
		// map initializes the Reduce phase
		// reduce ends
		if m.TaskPhase == MapPhase {
			m.initReduceTask()
		} else {
			m.IsDone = true
			close(m.TaskChan)
		}
	} else {
		m.IsDone = false
	}
	ret = m.IsDone
	return ret
}

// Initialize the Reduce phase
func (m *Master) initReduceTask(a) {
	m.TaskPhase = ReducePhase
	m.IsDone = false
	m.TaskState = make([]TaskState, m.ReduceNum)
	for k := range m.TaskState {
		m.TaskState[k].Status = TaskStatusReady
	}
}

// Put the task into the task queue
func (m *Master) addTask(taskIndex int) {
	// Construct task information
	m.TaskState[taskIndex].Status = TaskStatusQueue
	task := Task{
		FileName:  "",
		MapNum:    len(m.Files),
		ReduceNum: m.ReduceNum,
		TaskIndex: taskIndex,
		TaskPhase: m.TaskPhase,
		IsDone:    false,}if m.TaskPhase == MapPhase {
		task.FileName = m.Files[taskIndex]
	}
	// Put it into the task queue
	m.TaskChan <- task
}

// Check whether the task processing times out
func (m *Master) checkTask(taskIndex int) {
	timeDuration := time.Now().Sub(m.TaskState[taskIndex].StartTime)
	if timeDuration > MaxTaskRunTime {
		// The task times out and joins the queue again
		m.addTask(taskIndex)
	}
}

Copy the code
// worker.go

type KeyValue struct {
	Key   string
	Value string
}

// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by MapPhase.
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)}// Worker main thread, loop request task and report task
func Worker(mapf func(string.string) []KeyValue.
	reducef func(stringAnd []string) string) {

	for {
		// Request a task
		reply := ReqTaskReply{}
		reply = reqTask()
		if reply.TaskDone {
			break
		}
		// Execute the task
		err := doTask(mapf, reducef, reply.Task)
		iferr ! =nil {
			reportTask(reply.Task.TaskIndex, false)}// Report the results of the task
		reportTask(reply.Task.TaskIndex, true)}return
}

// Request a task
func reqTask(a) ReqTaskReply {
	// Declare parameters and assign values
	args := ReqTaskArgs{}
	args.WorkerStatus = true

	reply := ReqTaskReply{}

	/ / RPC calls
	if ok := call("Master.HandleTaskReq", &args, &reply); ! ok { log.Fatal("Request task failed...")}return reply
}

// Report the results of the task
func reportTask(taskIndex int, isDone bool) ReportTaskReply {
	// Declare parameters and assign values
	args := ReportTaskArgs{}
	args.IsDone = isDone
	args.TaskIndex = taskIndex
	args.WorkerStatus = true

	reply := ReportTaskReply{}

	/ / RPC calls
	if ok := call("Master.HandleTaskReport", &args, &reply); ! ok { log.Fatal("Report mission failure...")}return reply

}

// Execute the task
func doTask(mapf func(string.string) []KeyValue.reducef func(stringAnd []string) string.task Task) error {
	if task.TaskPhase == MapPhase {
		err := DoMapTask(mapf, task.FileName, task.TaskIndex, task.ReduceNum)
		return err
	} else if task.TaskPhase == ReducePhase {
		err := DoReduceTask(reducef, task.MapNum, task.TaskIndex)
		return err
	} else {
		log.Fatal("Abnormal task phase return value of request task...")
		return errors.New("Task phase return value of request task is abnormal")}return nil
}

// Execute the map task
func DoMapTask(mapf func(string.string) []KeyValue.fileName string.mapTaskIndex int.reduceNum int) error {

	fmt.Println("Start processing Map task...")
	// Open the file
	file, err := os.Open(fileName)
	iferr ! =nil {
		log.Fatalf("cannot open %v", fileName)
		return err
	}
	// Read the contents of the file
	content, err := ioutil.ReadAll(file)
	iferr ! =nil {
		log.Fatalf("cannot read %v", fileName)
		return err
	}
	file.Close()
	// Enter the map program
	kva := mapf(fileName, string(content))
	for i := 0; i < reduceNum; i++ {
		// The intermediate output name is mr-x-y
		intermediateFileName := intermediateName(mapTaskIndex, i)
		fmt.Printf("DoMap file name %s created \n", intermediateFileName)
		// Create an intermediate output file and store it in JSON format
		file, _ := os.Create(intermediateFileName)
		enc := json.NewEncoder(file)
		for _, kv := range kva {
			if ihash(kv.Key)%reduceNum == i {
				enc.Encode(&kv)
			}
		}
		file.Close()
	}
	return nil
}

// Perform reduce tasks
func DoReduceTask(reducef func(stringAnd []string) string.mapNum int.reduceTaskIndex int) error {
	fmt.Println("Start processing Reduce tasks...")
	// map:string->[]string
	res := make(map[string] []string)
	for i := 0; i < mapNum; i++ {
		// Open the intermediate file
		intermediateFileName := intermediateName(i, reduceTaskIndex)
		file, err := os.Open(intermediateFileName)
		iferr ! =nil {
			log.Fatalf("cannot open %v", intermediateFileName)
			return err
		}
		// Deserialize a JSON file
		dec := json.NewDecoder(file)
		// Read the contents of the file
		for {
			var kv KeyValue
			err := dec.Decode(&kv)
			iferr ! =nil {
				break
			}
			_, ok := res[kv.Key]
			if! ok { res[kv.Key] =make([]string.0)
			}
			res[kv.Key] = append(res[kv.Key], kv.Value)
		}
		file.Close()
	}
	// Extract the key value for sorting
	var keys []string
	for k := range res {
		keys = append(keys, k)
	}
	// Sort key values
	sort.Strings(keys)
	outputFileName := outputName(reduceTaskIndex)
	fmt.Printf("DoReduce output %s file name \n", outputFileName)
	outputFile, _ := os.Create(outputFileName)
	for _, k := range keys {
		output := reducef(k, res[k])
		// Output reduce results to the MR-out -X file
		fmt.Fprintf(outputFile, "%v %v\n", k, output)
	}
	outputFile.Close()

	return nil
}
Copy the code

Finally, you can verify the program by testing the script (/main/test-mr.sh) or by running the program.

// Script validation$ cd~ / 6.824
$ cd src/main
$ sh test-mr.sh
Copy the code
// Run program validation$ cd~ / 6.824
$ cd src/main
$go build -buildmode=plugin .. /mrapps/wc.go
$ rm mr-out*
$ go run mrmaster.go pg*.txt
$go run mrworker.go .. /mrapps/wc.so// Check the result after the program runs$ cat mr-out-* | sort | more
Copy the code

conclusion

Above, I summarized and recorded the initial learning of MapReduce. During this period, I read the paper, learned the whole idea of MapReduce, realized the charm of distribution, learned Golang from simple to profound, and finally completed Lab1, which was quite a sense of achievement. Finally, any doubts or mistakes are welcome to correct communication!

Attached code implementation address:

Github address: github.com/Januslll/mi…