MapReduce is a distributed programming model for large-scale Data Processing proposed by Google in the paper MapReduce: Simplified Data Processing on Large Clusters published in 2004.

The principle of

MapReduce divides data processing into two steps, Map and Reduce. Map divides the input data set into a batch of KV pairs and outputs them. For each < K1, v1>, Map will output a batch of < K2, v2>. Reduce summarizes the results generated in maps. For each

(list(v2) is all values with key K2), Reduce outputs the result

.
,>
,>

Take the word occurrence statistics program as an example. Map outputs

for each word in the document, while Reduce calculates the length of the list corresponding to each word and outputs

:
,>
,>

map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate (w, "1"); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));Copy the code

process

The MapReduce process is as follows:

  1. Divide the input into M segments to generate M Map tasks and R Reduce tasks.
  2. Create 1 master and N workers, and the master will assign Map and Reduce to workers for execution.
  3. The worker assigned the Map task reads and parses the KV pair from the input, passes it to the Map function provided by the user, and obtains a batch of intermediate KV pairs.
  4. The middle KV pair is allocated to R regions using partition function and saved to disk. When the Map task is completed, the saved location is returned to master.
  5. The Reduce worker reads data from the file system according to the parameters transmitted by the master, resolves the KV pair, and aggregates the values with the same key to generate<k2, list(v2)>. If you cannot sort in memory, you need to use an external sort.
  6. For each unique key, will<k2, list(v2)>Pass to the user-provided Reduce function and append the return value of the function to the output file.
  7. When all tasks are complete, the MapReduce program returns

The MapReduce process is not complicated. The data is fragmented and submitted to Map. The intermediate result is processed and then sent to Reduce to generate the final result.

Fault tolerance

When a worker fails, it can be detected through heartbeat and other methods. When the fault is detected, the task can be re-assigned to other workers for re-execution.

When the master fails, it can be recovered by checkpoint. However, since there is only one master, it is difficult to recover, so you can let the user detect and re-execute the task.

For output files, it is necessary to ensure that the files still being written are not read, that is, to ensure the atomicity of the operation. This can be achieved through the atomicity of file system renaming operations, where the results are saved in temporary files and renames when the execution is complete. Using this approach, you can rename a write that has side effects to idempotent (operations that always produce the same result, such as a = 2 are idempotent and a += 2 is not).

laggards

An important factor affecting the total execution time of a task is the laggard: a machine that takes a long time to complete the last few tasks in an operation, thus increasing the total execution time. In this case, when the task is about to be completed, the remaining tasks can be assigned to the standby process for execution. Whether the original worker completes the task or the standby process completes the task, the task can be marked as completed.

The partition function

For the results generated by Map, KV pairs with the same key are allocated to the same Reduce by partition function. The default partition function is hash(key) % R, but other partitioning functions may be selected in some cases. Hash (hostname(key)) % R can be used as a partition function if key is a URL and you want the results of the same host to be in the same output.

implementation

The implementation part is based on the MIT 6.824 experiment.

type Coordinator struct {
	mapJobs      []Job
	reduceJobs   []Job
	status       int
	nMap         int
	remainMap    int
	nReduce      int
	remainReduce int
	lock         sync.Mutex
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}
	c.status = MAP
	c.nMap = len(files)
	c.remainMap = c.nMap
	c.nReduce = nReduce
	c.remainReduce = c.nReduce
	c.mapJobs = make([]Job, len(files))
	c.reduceJobs = make([]Job, nReduce)
	for idx, file := range files {
		c.mapJobs[idx] = Job{[]string{file}, WAITTING, idx}
	}
	for idx := range c.reduceJobs {
		c.reduceJobs[idx] = Job{[]string{}, WAITTING, idx}
	}
	c.server()
	return &c
}

func (c *Coordinator) timer(status *int) {
	time.Sleep(time.Second * 10)

	c.lock.Lock()
	if *status == RUNNING {
		log.Printf("timeout\n")
		*status = WAITTING
	}
	c.lock.Unlock()
}

func (c *Coordinator) AcquireJob(args *AcquireJobArgs, reply *AcquireJobReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	fmt.Printf("Acquire: %+v\n", args)
	if args.CommitJob.Index >= 0 {
		if args.Status == MAP {
			if c.mapJobs[args.CommitJob.Index].Status == RUNNING {
				c.mapJobs[args.CommitJob.Index].Status = FINISHED
				for idx, file := range args.CommitJob.Files {
					c.reduceJobs[idx].Files = append(c.reduceJobs[idx].Files, file)
				}
				c.remainMap--
			}
			if c.remainMap == 0 {
				c.status = REDUCE
			}
		} else {
			if c.reduceJobs[args.CommitJob.Index].Status == RUNNING {
				c.reduceJobs[args.CommitJob.Index].Status = FINISHED
				c.remainReduce--
			}
			if c.remainReduce == 0 {
				c.status = FINISH
			}
		}
	}
	if c.status == MAP {
		for idx := range c.mapJobs {
			if c.mapJobs[idx].Status == WAITTING {
				reply.NOther = c.nReduce
				reply.Status = MAP
				reply.Job = c.mapJobs[idx]
				c.mapJobs[idx].Status = RUNNING
				go c.timer(&c.mapJobs[idx].Status)
				return nil
			}
		}
		reply.NOther = c.nReduce
		reply.Status = MAP
		reply.Job = Job{Files: make([]string.0), Index: - 1}}else if c.status == REDUCE {
		for idx := range c.reduceJobs {
			if c.reduceJobs[idx].Status == WAITTING {
				reply.NOther = c.nMap
				reply.Status = REDUCE
				reply.Job = c.reduceJobs[idx]
				c.reduceJobs[idx].Status = RUNNING
				go c.timer(&c.reduceJobs[idx].Status)
				return nil
			}
		}
		reply.NOther = c.nMap
		reply.Status = REDUCE
		reply.Job = Job{Files: make([]string.0), Index: - 1}}else {
		reply.Status = FINISH
	}
	return nil
}
Copy the code

Coordinators store all task information and execution status. Workers submit and apply for tasks by AcquireJob. Reduce tasks can be executed only after all Map tasks are completed. Here we simply treat each file as a task.

func doMap(mapf func(string.string) []KeyValue.job *Job.nReduce int) (files []string) {
	outFiles := make([]*os.File, nReduce)
	for idx := range outFiles {
		outFile, err := ioutil.TempFile(". /"."mr-tmp-*")
		iferr ! =nil {
			log.Fatalf("create tmp file failed: %v", err)
		}
		defer outFile.Close()
		outFiles[idx] = outFile
	}
	for _, filename := range job.Files {
		file, err := os.Open(filename)
		iferr ! =nil {
			log.Fatalf("cannot open %v", filename)
		}
		content, err := ioutil.ReadAll(file)
		iferr ! =nil {
			log.Fatalf("cannot read %v", filename)
		}
		file.Close()
		kva := mapf(filename, string(content))
		for _, kv := range kva {
			hash := ihash(kv.Key) % nReduce
			js, _ := json.Marshal(kv)
			outFiles[hash].Write(js)
			outFiles[hash].WriteString("\n")}}for idx := range outFiles {
		filename := fmt.Sprintf("mr-%d-%d", job.Index, idx)
		os.Rename(outFiles[idx].Name(), filename)
		files = append(files, filename)
	}
	return
}

func doReduce(reducef func(stringAnd []string) string.job *Job.nMap int) {
	log.Printf("Start reduce %d", job.Index)
	outFile, err := ioutil.TempFile(". /"."mr-out-tmp-*")
	defer outFile.Close()
	iferr ! =nil {
		log.Fatalf("create tmp file failed: %v", err)
	}
	m := make(map[string] []string)
	for _, filename := range job.Files {
		file, err := os.Open(filename)
		iferr ! =nil {
			log.Fatalf("cannot open %v", filename)
		}
		scanner := bufio.NewScanner(file)
		for scanner.Scan() {
			kv := KeyValue{}
			iferr := json.Unmarshal(scanner.Bytes(), &kv); err ! =nil {
				log.Fatalf("read kv failed: %v", err)
			}
			m[kv.Key] = append(m[kv.Key], kv.Value)
		}
		iferr := scanner.Err(); err ! =nil {
			log.Fatal(err)
		}
		file.Close()
	}
	for key, value := range m {
		output := reducef(key, value)
		fmt.Fprintf(outFile, "%v %v\n", key, output)
	}
	os.Rename(outFile.Name(), fmt.Sprintf("mr-out-%d", job.Index))
	log.Printf("End reduce %d", job.Index)
}

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string.string) []KeyValue.
	reducef func(stringAnd []string) string) {
	CallExample()
	var status int = MAP
	args := AcquireJobArgs{Job{Index: - 1}, MAP}
	for {
		args.Status = status
		reply := AcquireJobReply{}
		call("Coordinator.AcquireJob", &args, &reply)
		fmt.Printf("AcReply: %+v\n", reply)
		if reply.Status == FINISH {
			break
		}
		status = reply.Status
		if reply.Job.Index >= 0 {
			// get a job, do it
			commitJob := reply.Job
			if status == MAP {
				commitJob.Files = doMap(mapf, &reply.Job, reply.NOther)
			} else {
				doReduce(reducef, &reply.Job, reply.NOther)
				commitJob.Files = make([]string.0)}// job finished
			args = AcquireJobArgs{commitJob, status}
		} else {
			// no job, sleep to wait
			time.Sleep(time.Second)
			args = AcquireJobArgs{Job{Index: - 1}, status}
		}
	}
}
Copy the code

The worker applies for and submits a task to coordinator. AcquireJob through an RPC call, and then executes doMap or doReduce based on the task type.

The doMap function reads the target file and passes

to the map function, which then writes the return value to the target intermediate file according to the hash(key) % R.
,>

The doReduce function reads KV pairs from the target file and loads them into memory, merging the same keys (I used map to do this, but later I found that it was done by sorting, so as to ensure that the keys in each output file are in order). After the merge, the

is handed to the Reduce function and the return value is written to the result file.
,>