“This is the 11th day of my participation in the Gwen Challenge in November. See details: The Last Gwen Challenge in 2021.”

Hadoop serialization

1. Serialization Overview

1.1. What is serialization

Serialization is the conversion of an object in memory into a sequence of bytes (or other data transfer protocol) for storage to disk (persistence) and network transfer.

Deserialization is the conversion of persistent data received from byte sequences (or other data transfer protocols) or disk into objects in memory.

1.2. Why serialization

In general, “live” objects live only in memory and are lost when power is turned off. And “live” objects can only be used by local processes and cannot be sent to another computer on the network. Serialization, however, can store “live” objects, which can be sent to a remote computer.

1.3. Why not serialize in Java

Java serialization is a Serializable framework. When an object is serialized, it carries a lot of additional information (various checksum information, headers, inheritance system, etc.) that cannot be transferred efficiently over a network. So Hadoop developed its own serialization mechanism (Writable).

1.4 features of Hadoop serialization

  • Compact: Efficient use of storage space.
  • Fast: Low overhead for reading and writing data.
  • Interoperability: Supports multi-language interaction.

2. Implement serialization of custom Bean objects

The basic serialization types often used in enterprise development do not meet all requirements, such as passing a bean object inside the Hadoop framework, which needs to implement a serialization interface.

The specific realization of bean object serialization steps are as follows.

  1. The Writable interface must be implemented
  2. When deserializing, the nullparameter constructor needs to be called by reflection, so nullparameter construction is required
  3. Override serialization method
  4. Override the deserialization method
  5. Note that the order of deserialization is exactly the same as the order of serialization
  6. To display the results in a file, you need to override toString(), separated by “\t” for later use.
  7. If you need to transfer custom beans in keys, you also need to implement the Comparable interface because the Shuffle process in the MapReduce box requires that keys be able to be sorted.

3. Serialize practical cases

  1. demand

    Collect statistics on the total upstream traffic, total downstream traffic, and total traffic consumed by each mobile number

    The input data

    Input data format

    Expected output data format

  2. Demand analysis

  3. Write MapReduce programs

    • Write a Bean object for traffic statistics

      public class FlowBean implements Writable {
          private long upFlow; // Upstream traffic
          private long downFlow; // Downstream traffic
          private long sumFlow; / / traffic
      
          // Empty parameter construct
          public FlowBean(a) {}public long getUpFlow(a) {
              return upFlow;
          }
      
          public void setUpFlow(long upFlow) {
              this.upFlow = upFlow;
          }
      
          public long getDownFlow(a) {
              return downFlow;
          }
      
          public void setDownFlow(long downFlow) {
              this.downFlow = downFlow;
          }
      
          public long getSumFlow(a) {
              return sumFlow;
          }
      
          public void setSumFlow(long sumFlow) {
              this.sumFlow = sumFlow;
          }
      
          public void setSumFlow(a) {
              this.sumFlow = this.upFlow + this.downFlow;
          }
      
          @Override
          public void write(DataOutput out) throws IOException {
              out.writeLong(upFlow);
              out.writeLong(downFlow);
              out.writeLong(sumFlow);
          }
      
          @Override
          public void readFields(DataInput in) throws IOException {
              this.upFlow = in.readLong();
              this.downFlow = in.readLong();
              this.sumFlow = in.readLong();
          }
      
          @Override
          public String toString(a) {
              return upFlow + "\t" + downFlow + "\t"+ sumFlow; }}Copy the code
    • Write the Mapper class

      public class FlowMapper extends Mapper<LongWritable.Text.Text.FlowBean> {
      
          private Text outK = new Text();
          private FlowBean outV = new FlowBean();
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 1 gets a row
              String line = value.toString();
              / / 2
              String[] split = line.split("\t");
              // 3 Fetch the desired data
              // Mobile phone number: 13736230513
              // upstream and downstream traffic: 2481,24681
              String phone = split[1];
              String up = split[split.length - 3];
              String down = split[split.length - 2];
              / / 4 encapsulation
              outK.set(phone);
              outV.setUpFlow(Long.parseLong(up));
              outV.setDownFlow(Long.parseLong(down));
              outV.setSumFlow();
              / / 5 writecontext.write(outK, outV); }}Copy the code
    • Write the Reducer class

      public class FlowReducer extends Reducer<Text.FlowBean.Text.FlowBean> {
          private FlowBean outV = new FlowBean();
      
          @Override
          protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
              // 1 iterates through the aggregate of the set
              long totalUp = 0;
              long totaldown = 0;
              for (FlowBean value : values) {
                  totalUp += value.getUpFlow();
                  totaldown += value.getDownFlow();
              }
              // 2 encapsulates outk, outv
              outV.setUpFlow(totalUp);
              outV.setDownFlow(totaldown);
              outV.setSumFlow();
              / / 3 to writecontext.write(key, outV); }}Copy the code
    • Write Driver Driver classes

      public class FlowDriver {
      
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              // 1 Obtain the job
              Configuration conf = new Configuration();
              Job job = Job.getInstance(conf);
              // 2 Set the jar
              job.setJarByClass(FlowDriver.class);
              // 3 Associate mapper and Reducer
              job.setMapperClass(FlowMapper.class);
              job.setReducerClass(FlowReducer.class);
              // 4 Set the key and value types output by mapper
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(FlowBean.class);
              // 5 Set the key and value types for the final data output
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(FlowBean.class);
              // 6 Set the data input path and output path
              FileInputFormat.setInputPaths(job, new Path("D:\input\inputflow"));
              FileOutputFormat.setOutputPath(job, new Path("D:\hadoop\output4"));
              // 7 Submit the job
              boolean result = job.waitForCompletion(true);
              System.exit(result ? 0 : 1); }}Copy the code

2. MapReduce framework principles

1. InputFormat Data input

1.1. Determination mechanism of parallelism between slice and MapTask

  • Questions lead

    The parallelism of a MapTask determines the parallelism of tasks in the Map phase, which affects the processing speed of the entire Job.

    Consider: 1G of data, starting 8 MapTasks, can improve the concurrent processing capacity of the cluster. Will 1K of data, starting 8 MapTasks, improve cluster performance? Is MapTask more parallel tasks better? What factors affect MapTask parallelism?

  • MapTask parallelism determination mechanism

    Data blocks: The HDFS physically divides data into blocks. A data block is a unit of HDFS data.

    Data slicing: Data slicing only logically slashes input, and does not slice it for storage on disk. A data slice is a unit used by the MapReduce program to calculate input data. A slice starts a MapTask.

1.2 detailed explanation of source code of Job submission process and slice source code

  • Job submission process source code in detail

    waitForCompletion()
    
    submit();
    
    // 1 Establish a connection
    
    connect();
    
    // 1) Create an agent that submits the Job
    
    new Cluster(getConfiguration());
    
    // (1) Determine the local running environment or yarn cluster running environment
    
    initialize(jobTrackAddr, conf);
    
    // 2 Submit the job
    
    submitter.submitJobInternal(Job.this, cluster)
    
    // 1) Create a Stag path for submitting data to the cluster
    
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    
    // 2) Get jobid and create Job path
    
    JobID jobId = submitClient.getNewJobID();
    
    // 3) Copy jar packages to cluster
    
    copyAndConfigureFiles(job, submitJobDir);
    
    rUploader.uploadFiles(job, jobSubmitDir);
    
    // 4) Calculate slices and generate slice planning files
    
    writeSplits(job, submitJobDir);
    
    maps = writeNewSplits(job, jobSubmitDir);
    
    input.getSplits(job);
    
    // 5) Write the XML configuration file to Stag path
    
    writeConf(conf, submitJobFile);
    
    conf.writeXml(out);
    
    // 6) Submit the Job
    
    status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
    Copy the code

  • The value of our FileInputFormat splits to that field.

1.3 FileInputFormat Slicing mechanism

1.4, TextInputFormat

  • FileInputFormat implementation class

    Consider: When running MapReduce, input file formats include row-based log files, binary format files, and database tables. So how does MapReduce read this data for different data types?

    FileInputFormat Common interface implementation classes include: TextInputFormat, KeyValueTextInputFormat, NLineInputFormat, CombineTextInputFormat and custom InputFormat, etc.

  • TextInputFormat

    TextInputFormat is the default FileInputFormat implementation class. Read each record in a row. The key is the starting byte offset, of type LongWritable, that stores the line throughout the file. The value is the contents of the line, excluding any line terminators (newline and carriage return), and the Text type.

    For example, a shard contains the following four text records.

1.5. CombineTextInputFormat Slicing mechanism

The default TextInputFormat slicing mechanism of the framework is to plan slicing tasks according to files. No matter how small the file is, it will be a separate slice and will be handed to a MapTask. In this way, if there are a large number of small files, a large number of MapTasks will be generated and the processing efficiency is extremely low.

  • Application scenarios

    CombineTextInputFormat is used for scenarios with too many small files, and it can logically plan multiple small files into a slice so that they can be handed over to a MapTask for processing.

  • Set the maximum number of virtual storage slices

    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m

    Note: It is best to set the maximum value of virtual storage slice based on the actual small file size.

  • Slice mechanism

    The slicing process includes two parts: virtual stored procedure and slicing procedure.

1.6. Case practice of CombineTextInputFormat

  • demand

    A large number of input small files into a slice unified processing.

  • The implementation process

    • Without any processing, run the WordCount case program and observe that the number of slices is 4.

      number of splits:4

    • Add the following code to WordcountDriver, run the program, and observe that the number of slices running is 3.

      • Add the following code to the driver class:
      // If InputFormat is not set, it defaults to textinputFormat.class
      job.setInputFormatClass(CombineTextInputFormat.class);
      // Set the maximum number of virtual storage slices to 4m
      CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
      Copy the code
      • Run if for 3 slices.

        number of splits:3

    • Add the following code to WordcountDriver, run the program, and observe that the number of slices running is 1.

      • Add the following code to the driver:

        // If InputFormat is not set, it defaults to textinputFormat.class
        job.setInputFormatClass(CombineTextInputFormat.class);
        // The maximum value of virtual storage slices is 20 MB
        CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
        Copy the code
      • Run if is 1 slice

        number of splits:1

MapReduce workflow

2.1 Detailed working process of MapReduce 1

2.2 Detailed MapReduce workflow ii

The preceding process is the most complete MapReduce process, but the Shuffle process starts from Step 7 to step 16. The Shuffle process is described as follows:

(1) MapTask collects the KV pairs output by our map() method and puts them into the memory buffer

(2) Constantly overflows local disk files from the memory buffer, which may overflow multiple files

(3) Multiple overflow files will be merged into a large overflow file

(4) During overflow and merge, the Partitioner is called for partitioning and sorting on keys

(5) ReduceTask Get corresponding result partitioning data from each MapTask machine according to its own partition number

(6) ReduceTask will capture the result files from different MapTasks in the same partition, and ReduceTask will merge these files again (merge sort)

(7) After merging into large files, the Shuffle process ends and then enters the logical operation process of ReduceTask (extract key value pairs one by one from the file and invoke user-defined reduce() method)

Note:

(1) The size of the buffer in Shuffle affects the execution efficiency of the MapReduce program. In principle, the larger the buffer is, the fewer I/O operations are performed on the disk, and the higher the execution speed is.

(2) the size of the buffer can pass parameter adjustment, parameters: graphs. Task. IO. Sort. The MB of 100 MB by default.

Three, friendship links

MapReduce overview of Big Data Hadoop-MapReduce