preface

You’ll get to the core idea of a distributed computing engine, MapReduce parallelism parsing

1. Core ideas of MapReduce

1.1 Distributed Computing Engine

Take you to big data (3) – MapReduce introduction, interested also can jump over to see, have written before, now some time later, reunderstand. The figure below depicts the core phases of MapReduce

One sentence summarizes the core idea of distributed processing: divide and conquer + parallel computing. Later Spark and Flink were designed with its ideas in mind. HDFS can also be summed up in one sentence: distributed storage + redundant storage.

Complex problems that can’t be solved by a single computer can be solved by taking advantage of the power of many people: a cluster of multiple servers is built to handle distributed parallel computing. The core process is:

  1. Stage 1: Mapper (feature extraction process) : Large complex tasks are broken down into multiple small tasks to perform calculations in parallel
  2. The Reducer in the second stage (where logic is executed) : compacts the execution results of the small tasks that are concurrently executed in the first stage

However, the transformation of single machine task into distributed computing task itself will encounter a variety of problems.

  1. To solve the problem of data storage, we first need to solve the problem of massive data storage. It’s known as HDFS
  2. The operation logic should be divided into at least two phases, first concurrent calculation (MAP) and then summary (reduce) results
  3. How do these two phases of computation start? How to coordinate? Map must be implemented before Reduce
  4. How does the algorithm actually work? Does the data find the program or the program find the data? The data is in the cluster of machines, but the machine performing the task may not have the data, what to do?
  5. How to allocate multiple computing tasks in two phases?
  6. How to manage the intermediate status of tasks and how to tolerate faults? How should the results be saved, and what if the machine suddenly crashes while performing the task?
  7. How do I monitor and track task execution? Multiple machines will produce the calculated results, but can we pull the machine that completed the first phase of the task to the second phase?
  8. How do I handle errors? Throw exception? Try again?

MapReduce does all of this for us, except for the logic.

1.2 MapReduce Architecture Design

It is also a large chain of responsibility pattern system. And it has to make sure that no matter how complex the type of data, no matter how complex the type of computation, no matter how big the data is, no matter where it’s stored, it should be able to use this framework to perform the related tasks. This is also generality as a framework! The whole process of distributed computing is completed, and then the specification is formulated, so that users only need to write the corresponding business logic according to the specification.

  1. The first problem is the interconnection of data sources. The data reading component of MapReduce is InputFormat + RecordReader

  2. Stage 1: Mapper, transform the original data, extract the data to be calculated (Value), and add features (Key)

  3. Intermediate stage: The values of the same characteristics are aggregated together to perform a logical summary of the second stage

(Sort local aggregation operations partition operations, and a subtle grouping)

  1. The second stage: Reducer

  2. Data landing: Basically goes back to where it came from. Write component: OutputFormat + RecordWriter

Then all the components and processes serve something called a Job, and each component has a default implementation. For example, the default implementation of InputFormat + RecordReader is TextInputFormat + LineRecordReader

We also mentioned sort, aggregate, and partition (Partitioner, Combiner, Sorter)

Partitioner: The default component is HashPartitioner, but if you have only one reduceTask, it will be useless. Sorter: Unroll Combiner while remaining on Shuffle

1.3 the Job and the Context

We can assemble our MapReduce program using the Job object and submit it for execution

Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setMapperClass(xxx); job.setReducerClass(xxx); job.setPartitionerClass(xxxx); . job.submit();Copy the code

In the whole process of Job execution, we will generate two different tasks, MapTask and ReduceTask, and both of them have a global Context object Context

Context is called a Context object in MapReduce. In simple terms, if you’re writing a MapReduce program that has two phases of reading data and fetching data, you can use it. The following code is the classic way to use it

Boolean result = context.nextKeyValue(); Boolean result = context.nextKeyValue(); inKey = context.getCurrentKey(); inValue = context.getCurrentValue(); (inKey, inValue) ===> (outKey, outValue); context.write(outKey, outValue);Copy the code

The Context interface has two implementation classes, MapContext and ReduceContext

1.4 program on

First, let’s explain how the Job is executed. Suppose the Job is scheduled by Yarn

After the Job is submitted, ResourceManager determines whether the Job can be executed. If the Job can be executed, ResourceManager finds an idle node and starts a component that is responsible for tracing task execution, scheduling, and fault tolerance. We call it MRApplicationMaster, and it takes full responsibility for the two-phase task distribution.

MRApplicationMaster will enable the three NodeManagers to start two tasks, MapTask and ReduceTask respectively. In fact, they are started at the same time. After the MapTask is executed, It will report to MRApplicationMaster, then ReduceTask will execute, and report after completion. After the whole Job is executed, ApplicationMaster will report to RM, and then the task is completed.

Of course, we can check the progress of the Job from the Client, so MRApplicationMaster will report the progress not only to RM but also to the Client.

2. MapReduce parallelism analysis

2.1 MapTask parallelism mechanism

Let’s say that the default block size is 128MB, and now I have a file of 300M, how many maptasks will that start? What if it’s 260mb? How many more will start?

That value will be missing when the FileInputFormat is available. That value will be missing when the FileInputFormat is available

So I’m just taking the important part that I need to look at, and it’s not that hard to do, but to put it simply, this parameter bytesRemaining is just the size that’s left after you slice it, and if this size is greater than 1.1 divided by the specified size of the slice, it will continue to help you slice it. The specified slice size is determined by the computeSplitSize method above, which simply calculates the median of the three parameters. Of course, the operations just mentioned will be judged by the isSplitable method.

So there you have it, 260M = 128M + 128M + 4M, 4M is not enough for 0.1 of 128M, so it won’t start another task

And of course this parameterSPLIT_SLOPWe could have clicked on it

If you adjust the parameters to make the file unsplit, you can only start one Task regardless of the size of the file, or you can set the branch of isSplitable to false.

This part of the source code is written more clear, so interested partners download the source code to see can, then there is a source version of the computeSplitSize method for the three parameters splitsize minsize maxsize, minsize default value is 1, By default, maxsize is the maximum value of Long. I personally feel that these three parameters are better than the one in the 2.7.0 source code, goalsize, which is a value of the user’s expected size compared to the blocksize, which is the same.

In conclusion, the two ways to modify the logical slices are computeSplitSize method and isplitable method logic

2) goalSize, minSize, blockSizeCopy the code

Also note that blocksize needs to be changed to format HDFS. If you want to change the splitsize, you must strictly follow the number of blocksize * n. Otherwise, you can think about the situation when the task is fetching data from different data blocks. For example, I made a whole splitsize = 150. If a Task processes 128MB of data, the first Task will fetch 22mb of the first file block and the second file block, and the second Task will fetch 128MB of the second data block and 22MB of the third data block. This is obviously unreasonable

We can see the definitions of these three parameters

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long blockSize = file.getBlockSize();
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  FileInputFormat.SPLIT_MINSIZE, defaultValue:1), minSplitSize);
Copy the code

Minsize the default value is 1, goalsize the size of the file is to block/org. When the job start. Apache hadoop. Mapred. JobConf. SetNumMapTasks (int n) set of values, is not set, the default is 1, So goalSize is usually the size of the block of files to be processed, and many people would say, “setNumMapTasks” means how many mapTasks to set. It’s not. It’s not really a conditional. Now let’s move on.

Then blocksize is the size of the block. We can see that if the user wants to use the expected splitsize, the computeSplitSize method takes the median of the three parameters as the result, so even if we set goalSize larger than blocksize, The result will also be the value of blocksize, so this will ensure that the splitsize of the file we split will not be larger than the blocksize blocksize.

If goalSize is too small, you can also make minsize the median by increasing the value of MINsize. So the splitsize is going to get bigger, and naturally the mapper number is going to go down.

ComputeSplitSize: computeSplitSize: computeSplitSize: computeSplitSize: computeSplitSize: computeSplitSize: computeSplitSize: computeSplitSize: computeSplitSize

The median is obtained by taking the goalSize, the smaller of the blocksizes, and the larger of the last minsize.

For versions of maxsize, minSize, and blockSize, the most appropriate method is to change the blockSize to the median of maxsize, minSize, and blockSize. For example, if you want to set it to 256M, set the minsize to 256M, and if you want to set it to 64M, set the maxsize to 64M. Isn’t that simple.

If the file is not sharable, then only if the data is compressed, this is a problem with the data source. For example, if the data is compressed by LZO, you need to create a separate index to slice it.

2.2 ReduceTask Parallelism decision mechanism

ReduceTask’s degree of parallelism is actually super simple

job.setNumReduceTasks(4);
Copy the code

All right, let’s do it. Of course it can also be set to 0, so you don’t run the task at this stage

However, this parameter is not set so arbitrarily, because it actually relates to the fact that data is divided into several parts in the shuffle phase, and try not to run too many reduceTask. For most jobs, it is best to have the same number of reducetasks as the number of slave nodes in the cluster. This is especially important for small clusters.

Remember that under the default divider, which is the HashPartitioner rule, you can use the code above to specify as many reducetasks as you want. If it’s a custom divider, make sure the logic matches.

Serialization and sorting

Writable is the serialization and deserialization interface, and Comparable is the sort interface, since the shuffle phase is bound to be sorted. Therefore, they omit commas and create the WritableComparable interface

And the source code comes with demo, need to understand the direct source can be seen

Finally

Because the length should not be too long, so this is the first, later will continue to update the relevant content.