“This is the 27th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.

First, compression and storage

1. Hadoop compression configuration

1.1 Compression coding supported by MR

To support multiple compression/decompression algorithms, Hadoop introduces codecs, as shown in the following table:

Comparison of compression performance:

1.2. Configure compression parameters

To enable compression in Hadoop, configure the following parameters (in the mapred-site.xml file) :

2. Enable Map output phase compression (MR engine)

Enabling map output phase compression reduces the amount of data transferred between Map and Reduce tasks in jobs. The configuration is as follows:

Case practice:

  1. Enable the transmission data compression function of Hive

    hive (default)> set hive.exec.compress.intermediate=true;
    Copy the code
  2. Enable the map output compression function in MapReduce

    hive (default)> set mapreduce.map.output.compress=true;
    Copy the code
  3. Set the compression mode for map output data in MapReduce

    hive (default)> set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
    Copy the code
  4. Execute query statement

    hive (default)> select count(ename) name from emp;
    Copy the code

    Query whether SNappy exists in Map task logs

3. Enable Reduce output compression

When Hive writes output to a table, the output can also be compressed. Attribute hive.exec.com press. The output control of the function. The user may want to keep the default value false in the default Settings file so that the default output is an uncompressed plain text file. You can enable output compression by setting this value to true in the query statement or execution script.

Case practice:

  1. Enable the compression function for hive final output data

    hive (default)> set hive.exec.compress.output=true;
    Copy the code
  2. Enable mapReduce final output data compression

    hive (default)> set mapreduce.output.fileoutputformat.compress=true;
    Copy the code
  3. Set the compression mode for mapReduce output data

    hive (default)> set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
    Copy the code
  4. Example Set mapReduce final data output compression to block compression

    hive (default)> set mapreduce.output.fileoutputformat.compress.type = BLOCK;
    Copy the code
  5. Test if the output is a compressed file

    hive (default)> insert overwrite local directory '/opt/module/data/distribute-result' select * from emp distribute by deptno sort by empno desc;
    Copy the code

4. File storage format

Hive stores data in TEXTFILE, SEQUENCEFILE, ORC, and PARQUET formats.

4.1. Column and Row storage

As shown in the figure, the logical table is on the left, the first row storage is on the right, and the second column storage is on the right.

  1. Features of row storage

    When querying a whole row of data that meets the condition, column storage needs to find the value of each column in each clustered field. Row storage only needs to find one of the values, and the other values are in adjacent places, so the query speed of row storage is faster.

  2. Column storage features

    Because the data for each field is stored aggregated, the amount of data read can be greatly reduced when only a few fields are needed for a query; The data type of each field must be the same, and column storage can be tailored to better design compression algorithms.

    TEXTFILE and SEQUENCEFILE are row-based storage formats;

    ORC and PARQUET are based on column storage.

4.2 TextFile Format

Default format, data is not compressed, high disk overhead, high data parsing overhead. You can use both Gzip and Bzip2. However, hive does not split data in Gzip mode and therefore cannot perform parallel operations on data.

4.3 Orc format

Optimized Row Columnar (Orc) is a new storage format introduced in Hive 0.11.

As shown in the following figure, each Orc file consists of one or more stripes. Each stripe is generally the block size of HDFS, and each stripe contains multiple records. These records are stored independently in columns, corresponding to the concept of Row Group in Parquet. Each Stripe consists of three parts: Index Data, Row Data, and Stripe Footer:

  1. Index Data: a lightweight Index that is indexed every 1W rows by default. The index should only record the offset of the fields in the Row Data.

  2. Row Data: Stores specific Data by fetching partial rows and then storing them in columns. Each column is encoded into multiple streams for storage.

  3. Stripe Footer: Stores the type and length of each Stream.

    Each File has a File Footer, which stores the number of rows in each Stripe, data type of each Column, etc. At the end of each file is a PostScript, which records the compression type of the entire file and the length of the FileFooter. When reading a File, seek reads PostScript at the end of the File, parses the FileFooter length from the inside, then reads FileFooter, parses the information from the inside to each Stripe, and then reads each Stripe from the back to the front.

4.4 Parquet format

The Parquet file is stored in binary and therefore cannot be read directly. The file contains the data and metadata of the file, so the Parquet format file is self-parsed.

  1. Row Group: Each Row Group contains a certain number of rows. At least one Row Group can be stored in an HDFS file, similar to the concept of ORC’s stripe.

  2. Column Chunk: Each Column in a row group is stored in a Column Chunk. All columns in a row group are stored consecutively in this row group file. Values in a column block are all of the same type, and different columns blocks may be compressed using different algorithms.

  3. Page: Each column block is divided into multiple pages. A Page is the smallest unit of encoding. Different pages in the same column block may use different encoding methods.

    Generally, the size of the row group is set according to the Block size when Parquet data is stored. In general, the minimum unit for data processing of each Mapper task is a Block, so that each row group can be processed by a Mapper task to increase the parallelism of task execution. Parquet file format.

4.5. Comparative experiment of mainstream file storage formats

Compare the compression ratio and query speed of stored files.

Compression ratio test of stored files:

The test data

  1. TextFile

    • Create a table to store data in a TEXTFILE format

      The default format for storing data in a table is TEXTFILE. This parameter is not required by default.

      create table log_text (
      track_time string,
      url string,
      session_id string,
      referer string,
      ip string,
      end_user_id string,
      city_id string
      )
      row format delimited fields terminated by '\t'
      stored as textfile;
      Copy the code
    • Load data into a table

      Hive (default)> Load data local inpath '/opt/module/hive-3.1.2/datas/log.data' into table log_text; hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/log.data' into table log_text;Copy the code
    • View the data size in the table

      hive (default)> dfs -du -h /user/hive/warehouse/log_text; 18.1m 54.4m /user/hive/warehouse/log_text/log.dataCopy the code

  2. ORC

    • Create tables and store data in ORC format

      create table log_orc( track_time string, url string, session_id string, referer string, ip string, end_user_id string, city_id string ) row format delimited fields terminated by '\t' stored as orc tblproperties("orc.compress"="NONE"); -- Set orC storage not to use compressionCopy the code
    • Load data into a table

      Hive (default)> Load data local inpath '/opt/module/hive-3.1.2/datas/log.data' into table log_orc;Copy the code

      Table data stored in ORC format cannot be imported via load!!

      hive (default)> insert into table log_orc select * from log_text;
      Copy the code
    • View the data size in the table

      hive (default)> dfs -du -h /user/hive/warehouse/log_orc/; 7.7m 23.1m /user/hive/warehouse/log_orc/000000_0Copy the code

  3. Parquet

    • Create a table and store the data in the parquet format

      create table log_parquet(
      track_time string,
      url string,
      session_id string,
      referer string,
      ip string,
      end_user_id string,
      city_id string
      )
      row format delimited fields terminated by '\t'
      stored as parquet;
      Copy the code
    • Load data into a table

      hive (default)> insert into table log_parquet select * from log_text;
      Copy the code
    • View the data size in the table

      hive (default)> dfs -du -h /user/hive/warehouse/log_parquet/; 13.1m 39.3m /user/hive/warehouse/log_parquet/000000_0Copy the code

5. Combination of storage and compression

Liverpoolfc.tv: cwiki.apache.org/confluence/…

ORC storage mode compression:

Note: All ORCFile parameters are in the TBLPROPERTIES field of the HQL statement

  1. Create a zlib-compressed ORC storage

    • Build table statements

      create table log_orc_zlib(
      track_time string,
      url string,
      session_id string,
      referer string,
      ip string,
      end_user_id string,
      city_id string
      )
      row format delimited fields terminated by '\t'
      stored as orc
      tblproperties("orc.compress"="ZLIB");
      Copy the code
    • Insert data

      insert into log_orc_zlib select * from log_text;
      Copy the code
    • View the inserted data

      hive (default)> dfs -du -h /user/hive/warehouse/log_orc_zlib/ ;
      Copy the code

  2. Create an ORC storage mode for SNAPPY compression

    • Build table statements

      create table log_orc_snappy(
      track_time string,
      url string,
      session_id string,
      referer string,
      ip string,
      end_user_id string,
      city_id string
      )
      row format delimited fields terminated by '\t'
      stored as orc
      tblproperties("orc.compress"="SNAPPY");
      Copy the code
    • Insert data

      insert into log_orc_snappy select * from log_text;
      Copy the code
    • View the inserted data

      hive (default)> dfs -du -h /user/hive/warehouse/log_orc_snappy/;
      Copy the code

      ZLIB is smaller than Snappy compression. The reason is that ZLIB uses the Deflate compression algorithm. The compression rate is higher than that of SNappy compression.

  3. Create a SNAPPY compressed Parquet store

    • Build table statements

      create table log_parquet_snappy(
      track_time string,
      url string,
      session_id string,
      referer string,
      ip string,
      end_user_id string,
      city_id string
      )
      row format delimited fields terminated by '\t'
      stored as parquet
      tblproperties("parquet.compression"="SNAPPY");
      Copy the code
    • Insert data

      hive (default)> insert into log_parquet_snappy select * from log_text;
      Copy the code
    • View the inserted data

      hive (default)> dfs -du -h /user/hive/warehouse/log_parquet_snappy/;
      Copy the code

  4. Storage mode and compression summary

    In actual project development, the data storage format of Hive tables is usually orC or Parquet. The compression mode is SNappy or LZO.

Second, enterprise-level tuning

1. Implementation Plan (Explain)

  1. The basic grammar

    EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query
    Copy the code
  2. A case in field

    • View the execution plan for the following statement

    Did not generate MR task

    hive (default)> explain select * from emp;
    OK
    Explain
    STAGE DEPENDENCIES:
      Stage-0 is a root stage
    
    STAGE PLANS:
      Stage: Stage-0
        Fetch Operator
          limit: -1
          Processor Tree:
            TableScan
              alias: emp
              Statistics: Num rows: 1 Data size: 6690 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: empno (type: int), ename (type: string), job (type: string), mgr (type: int), hiredate (type: string), sal (type: double), comm (type: double), deptno (type: int)
                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
                Statistics: Num rows: 1 Data size: 6690 Basic stats: COMPLETE Column stats: NONE
                ListSink
    
    Copy the code

    There are generated MR tasks

    hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno;
    Copy the code

    • View the detailed execution plan

      hive (default)> explain extended select * from emp;
      hive (default)> explain extended select deptno, avg(sal) avg_sal from emp group by deptno;
      Copy the code

2, Fetch

Fetch refers to that Hive queries can be performed without MapReduce computing. For example, SELECT * FROM employees; In this case, Hive could simply read the files in the employee storage directory and output the query results to the console.

In the hive – default. XML. The template file hive. Fetch. Task. The default is more just, old version hive default is minimal, this attribute is modified to more later, Mapreduce is not used in global search, field search, and limit search.

Case practice:

  1. The hive. The fetch. Task. Just set to none, then execute the query, will perform program.

    hive (default)> set hive.fetch.task.conversion=none;
    
    hive (default)> select * from emp;
    
    hive (default)> select ename from emp;
    
    hive (default)> select ename from emp limit 3;
    Copy the code
  2. The hive. The fetch. Task. Just set up more, and then execute the query, query the following method will not execute the program.

    hive (default)> set hive.fetch.task.conversion=more;
    
    hive (default)> select * from emp;
    
    hive (default)> select ename from emp;
    
    hive (default)> select ename from emp limit 3;
    Copy the code

3. Local mode

Most Hadoop jobs require the full scalability provided by Hadoop to handle large data sets. However, sometimes Hive inputs are very small. In this case, the execution of the task triggered by the query may take much longer than the actual job execution time. For most of these cases, Hive can handle all tasks on a single machine in local mode. For small data sets, the execution time can be significantly reduced.

Users can set the hive. The exec. Mode. Local. Auto value is true, to allow the hive automatically start the optimization at the appropriate time.

/ / open local Mr Set hive. The exec. Mode. Local. Auto = true; // Set the maximum input value of local Mr. When the input value is smaller than this value, local Mr Is used. The default value is 134217728. Which is 128 m set hive exec. Mode. Local. Auto. Inputbytes. Max = 50000000; / / set the maximum number of the input file of local Mr, when the input file number is less than this value with the method of local Mr, defaults to 4 set hive. The exec. Mode. Local. Auto.. Input files. Max = 10;Copy the code

Case practice:

  1. Close the local mode (it is off by default) and execute the query statement

    hive (default)> select count(*) from emp group by deptno;
    Copy the code
  2. Enable the local mode and execute the query statement

    hive (default)> set hive.exec.mode.local.auto;
    hive (default)> set hive.exec.mode.local.auto=true;
    hive (default)> select count(*) from emp group by deptno;
    Copy the code

4, table optimization

4.1 Join (MapJOIN)

By placing tables with relatively scattered keys and small amounts of data to the left of the join, map Joins can be used to advance small dimension tables into memory. Complete the join on the map side.

Test results show that the new Hive has optimized small tables to JOIN large tables and large tables to JOIN small tables. There is no difference between a small watch on the left and the right.

Case practice:

  1. Demand is introduced

    Test the efficiency of large tables joining small tables and small tables joining large tables

  2. Enable the MapJoin parameter setting

    • Set automatic Mapjoin selection

      set hive.auto.convert.join = true; The default is trueCopy the code
    • Threshold setting for large tables and small tables (the default value is 25M or less) :

      set hive.mapjoin.smalltable.filesize = 25000000;
      Copy the code
  3. Working mechanism of MapJoin

  4. Statements that create large tables, small tables, and JOIN tables

    Create table bigtable(id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; Create table smallTable (id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; Create table joinTABLE (id bigint, t bigint, uid string, keyword string, URl_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';Copy the code
  5. Import data into large tables and small tables separately

    hive (default)> load data local inpath '/opt/module/data/bigtable' into table bigtable;
    
    hive (default)>load data local inpath '/opt/module/data/smalltable' into table smalltable;
    Copy the code
  6. Small tables JOIN large tables

    insert overwrite table jointable
    select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
    from smalltable s
    join bigtable b
    on b.id = s.id;
    Copy the code
  7. Large table JOIN small table sentence

    insert overwrite table jointable
    select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
    from bigtable b
    join smalltable s
    on s.id = b.id;
    Copy the code

Mysql > alter table Join table

Note: Hive automatically filters empty keys during inner join execution, as can be seen from explain execution plan. So empty KEY filtering is for non-internal connections!!

  1. Empty KEY filter

    Sometimes join timeout is caused by too much data corresponding to some keys, but data corresponding to the same keys are sent to the same Reducer, resulting in insufficient memory. In this case, we should carefully analyze the abnormal keys. In most cases, the data corresponding to these keys is abnormal data, which needs to be filtered in SQL statements. For example, the field corresponding to key is empty.

    Case practice:

    • Create raw data null ID table

      Create table NULlidTable (id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';Copy the code
    • Load raw data and null ID data into corresponding tables respectively

      hive (default)> load data local inpath '/opt/module/data/nullid' into table nullidtable;
      Copy the code
    • The test does not filter empty ids

      hive (default)> insert overwrite table jointable select n.* from nullidtable n left join bigtable o on n.id = o.id;
      Copy the code
    • Test filter empty ids

      hive (default)> insert overwrite table jointable select n.* from (select * from nullidtable where id is not null) n left  join bigtable o on n.id = o.id;Copy the code
  2. The empty key transformation

    Sometimes there are a lot of data corresponding to an empty key, but the corresponding data is not abnormal data and must be included in the result of join. At this time, we can assign a random value to the field with an empty key in Table A, so that the data can be randomly and evenly divided into different reducer.

    Case practice:

    • Non-randomly distributed null values:

      • Set the number of reduce tasks to five

        set mapreduce.job.reduces = 5;
        Copy the code
      • JOIN two tables

        insert overwrite table jointable select n.* from nullidtable n left join bigtable b on n.id = b.id;
        Copy the code

        Results: As shown in the figure below, it can be seen that there was a data skew, and the resource consumption of some reducer was much higher than that of others.

    • Randomly distributed null values:

      • Set the number of reduce tasks to five

        set mapreduce.job.reduces = 5;
        Copy the code
      • JOIN two tables

        insert overwrite table jointable select n.* from nullidtable n full join bigtable o on nvl(n.id,rand()) = o.id;
        Copy the code

        Results: As shown in the figure below, it can be seen that data skew is eliminated and resource consumption of reducer is balanced

  3. SMB(Sort Merge Bucket join)

    • Create the second large table

      create table bigtable2(
       id bigint,
       t bigint,
       uid string,
       keyword string,
       url_rank int,
       click_num int,
       click_url string)
      row format delimited fields terminated by '\t';
      
      load data local inpath '/opt/module/data/bigtable' into table bigtable2;
      Copy the code

      Test large table direct JOIN

      insert overwrite table jointable
      select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
      from bigtable s
      join bigtable2 b
      on b.id = s.id;
      Copy the code
    • Create split table 1 with no more buckets than the number of available CPU cores

      create table bigtable_buck1(
       id bigint,
       t bigint,
       uid string,
       keyword string,
       url_rank int,
       click_num int,
       click_url string)
      clustered by(id) 
      sorted by(id)
      into 6 buckets
      row format delimited fields terminated by '\t';
      
      load data local inpath '/opt/module/data/bigtable' into table bigtable_buck1;
      Copy the code
    • Create split table 2 with no more buckets than the number of available CPU cores

      create table bigtable_buck2(
       id bigint,
       t bigint,
       uid string,
       keyword string,
       url_rank int,
       click_num int,
       click_url string)
      clustered by(id) 
      sorted by(id)
      into 6 buckets
      row format delimited fields terminated by '\t';
      
      load data local inpath '/opt/module/data/bigtable' into table bigtable_buck2;
      Copy the code
    • Set the parameters

      set hive.optimize.bucketmapjoin = true;
      set hive.optimize.bucketmapjoin.sortedmerge = true;
      set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
      Copy the code
    • test

      insert overwrite table jointable
      select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
      from bigtable_buck1 s
      join bigtable_buck2 b
      on b.id = s.id;
      Copy the code

4.3, the Group By

By default, the Map phase distributes the same Key data to each Reduce. If the Key data is too large, it is skewed.

Not all aggregation operations need to be performed on the Reduce end. Some aggregation operations can be performed on the Map end and the final result can be obtained on the Reduce end.

The Map aggregation parameter setting function is enabled

  1. Whether to perform aggregation on the Map side. The default value is True

    set hive.map.aggr = true;
    Copy the code
  2. Number of items aggregated on the Map side

    set hive.groupby.mapaggr.checkinterval = 100000;
    Copy the code
  3. Load balancing when data skew is present (default: False)

    set hive.groupby.skewindata = true;
    Copy the code

    If the option is set to true, the generated query plan will have two MR Jobs. In the first MR Job, Map output results are randomly distributed to Reduce tasks. Each Reduce task performs partial aggregation operations and outputs the results. In this way, the same Group By Key may be distributed to different Reduce tasks to achieve load balancing. The second MR Job is then distributed to Reduce according to the Group By Key according to the preprocessed data results (this process ensures that the same Group By Key is distributed to the same Reduce). Finally, the final aggregation operation is completed.

    hive (default)> select deptno from emp group by deptno;
    Copy the code

4.4. Count(Distinct) Deduplication is performed

When the amount of data is small, it does not matter. When the amount of data is large, COUNT DISTINCT operations need to be performed by a Reduce Task. If the amount of data to be processed by a Reduce Task is too large, the whole Job cannot be completed. Generally, COUNT DISTINCT is replaced BY GROUP BY and then COUNT. However, data skew caused BY GROUP BY is required.

A case in field

  1. Create a large table

    create table bigtable
    (id bigint,
    time bigint,
    uid string,
    keyword string,
    url_rank int,
    click_num int,
    click_url string) 
    row format delimited fields terminated by '\t';
    Copy the code
  2. Load the data

    hive (default)> load data local inpath '/opt/module/data/bigtable' into table bigtable;
    Copy the code
  3. Set the number of reduce tasks to five

    set mapreduce.job.reduces = 5;
    Copy the code
  4. Perform deduplication id query

    hive (default)> select count(distinct id) from bigtable;
    Copy the code
  5. Group by is used to delete the ID

    select count(id) from (select id from bigtable group by id) a;
    Copy the code

    It takes an extra Job to complete, but it’s definitely worth it if there’s a lot of data.

4.5. Cartesian product

Avoid cartesian volume as much as possible. Do not add on conditions or invalid ON conditions when join. Only one Reducer can be used to complete cartesian volume.

4.6 Column filtering

Column processing: In SELECT, only the required column, if there is partition, try to use partition filtering, SELECT * less.

Row processing: In partitioning clipping, if the filter criteria for the secondary table are written after Where, the full table is associated first and then filtered.

Case practice:

  1. The test first associates two tables and then filters them using the WHERE condition

    hive (default)> select o.id from bigtable b
                  > join bigtable o on o.id = b.id
                  > where o.id <= 10;
    Copy the code
  2. After passing the subquery, the table is associated

    hive (default)> select b.id from bigtable b
                  > join (select id from bigtable where id <= 10) o on b.id = o.id;
    Copy the code

4.7, partition

4.8, barrel

5. Set the Map and Reduce numbers properly

  1. Typically, a job will generate one or more Map tasks from the input directory.

    The main determinants are: the total number of input files, the size of the input file, and the block size set by the cluster.

  2. Is the more maps the better?

    The answer is no. If a task has many small files (much smaller than the block size of 128M), each small file will be treated as a block and completed by a map task, which takes much longer to start and initialize than the logical processing time, resulting in a huge waste of resources. Furthermore, the number of maps that can be executed at the same time is limited.

  3. Is it safe to make sure that each map handles close to 128MB of blocks?

    The answer is not necessarily. For example, a 127m file would normally be completed with a map, but the file has only one or two small fields and tens of millions of records. If the logic of map processing is complicated, it will definitely be time-consuming to complete with a map task.

    To solve the above problems 2 and 3, we need to adopt two ways: reduce the number of maps and increase the number of maps;

5.1 Increase the number of Maps for complex Files

If the input files are large, the task logic is complex, and the map execution is slow, you can increase the number of maps to reduce the amount of data processed by each map and improve the task execution efficiency.

The method of adding a map is as follows: Adjust the maximum value of maxSize according to the formula computeSliteSize(math.max (minSize, math.min (maxSize,blocksize)) =blocksize=128M. You can increase the number of maps by keeping the maximum value of maxSize below blocksize.

Case practice:

  1. Execute the query

    hive (default)> select count(*) from emp;
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
    Copy the code
  2. Set the maximum slice value to 100 bytes

    hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;
    Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1
    Copy the code

5.2. Merge small files

  1. Merge small files before running a Map to reduce the number of Maps: CombineHiveInputFormat Combines small files (the default format). HiveInputFormat does not merge small files.

    set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    Copy the code
  2. To merge small files at the end of a Map-Reduce task:

    • Merge small files at the end of the map-only task. Default is true

      set hive.merge.mapfiles = true;
      Copy the code
    • Merge small files at the end of a Map-Reduce job. The default value is false

      set hive.merge.mapredfiles = true;
      Copy the code
    • The size of the merged file, 256M by default

      set hive.merge.size.per.task = 268435456;
      Copy the code
    • When the average size of output files is smaller than this value, an independent Map-reduce task is started to merge files

      set hive.merge.smallfiles.avgsize = 16777216;
      Copy the code

5.3. Set the Reduce number appropriately

  1. Adjusting the number of Reduce tasks Method 1

    • The default amount of data processed by each Reduce is 256MB

      set hive.exec.reducers.bytes.per.reducer;
      Copy the code
    • Maximum number of Reduce tasks for each job. The default value is 1009

      set hive.exec.reducers.max;
      Copy the code
    • Calculate the reducer number formula

      N=min(parameter 2, total input data/parameter 1)Copy the code
  2. Adjust the number of Reduce jobs. Method 2

    Modify the number of Reduce tasks for each job in the mapred-default. XML file of Hadoop

    set mapreduce.job.reduces = 15;
    Copy the code
  3. More reduce is not always better

    • Starting and initializing reduce too much also consumes time and resources.

    • In addition, there will be as many output files as there are reduce files. If many small files are generated, the problem of too many small files will also occur if these small files are used as the input of the next task.

    • When setting the number of Reduces, consider the following two principles: Use an appropriate number of Reduces to handle large amounts of data; Ensure that the amount of data processed by a single Reduce task is appropriate.

6. Parallel execution

Hive converts a query into one or more phases. These phases can be the MapReduce phase, the sampling phase, the merge phase, or the Limit phase. Or other phases that may be required during Hive execution. By default, Hive executes only one phase at a time. However, a particular job may contain many phases, which are not completely interdependent. In other words, some phases can be executed in parallel, shortening the execution time of the entire job. However, the more phases that can be executed in parallel, the faster the job is likely to complete.

Parallel can be enabled by setting the hive.exec.parallel parameter to true. However, in a shared cluster, if the number of parallel phases in a job increases, the cluster utilization increases.

Set hive.exec.parallel=true; // The maximum parallelism allowed for the same SQL is 8 by default. set hive.exec.parallel.thread.number=16;Copy the code

Of course, only when the system resources are relatively free will have an advantage, otherwise, no resources, parallel will not get up.

7. Strict mode

Hive can be configured to prevent dangerous operations:

  1. Partitioned tables do not use partition filtering

    Will hive. Strict. Checks. No. Partition. The filter is set to true, for the partition table, unless the where clause contains partition field filter conditions to limit the scope, otherwise not allowed to execute. In other words, the user is not allowed to scan all partitions. The reason for this restriction is that typically partitioned tables have very large data sets that grow rapidly. Queries without partitioning restrictions could consume an unacceptably large amount of resources to process the table.

  2. Use order by without limit filtering

    Will hive. Strict. Checks. Orderby. No. The limit is set to true, for queries using the order by the statement, request must use limit statement. Because order by will distribute all result data to the same Reducer for processing in order to perform the ordering process, forcing the user to add this LIMIT statement can prevent the Reducer from executing for a long time.

  3. The cartesian product

    Will hive. Strict. Checks. The cartesian. The product is set to true, will limit the cartesian product of the query. Users familiar with relational databases may expect to use a WHERE statement instead of an ON statement when executing a JOIN query, so that the relational database’s execution optimizer can efficiently convert the WHERE statement into that ON statement. Unfortunately, Hive does not perform this kind of optimization, so if the table is large enough, the query will get out of hand.

They are off by default, if necessary.

8. JVM reuse

9, compressed

Three, friendship links

Big data Hive learning journey 5

Big data Hive learning journey 4

Big data Hive learning journey 3

Big data Hive learning journey 2

Big data Hive learning journey 1