I use Hive a lot in my work and write a lot of HiveQL. This article summarizes some common Hive performance optimizations from three aspects.

Table design level optimization

Optimize with partitioned tables

A partitioned table classifies and stores data in one or more dimensions. A partition corresponds to a directory. If partition fields exist in the filtering criteria, Hive only needs to traverse files in the corresponding partition directory rather than global data, which greatly reduces the amount of data to be processed and improves query efficiency.

If Hive table queries are mostly filtered based on a field, it is a good idea to create a partitioned table.

Use bucket table optimization

After the number of buckets is specified, data can be stored in a bucket after being hashed based on a field. The purpose of this method is similar to that of a partition table. In this way, data can be filtered only through the bucket, rather than through all data globally.

Choose the appropriate file storage format

Apache Hive supports several familiar file formats used in Apache Hadoop.

The default format of TextFile, if not specified when creating the table.

Storage mode: row storage.

Each line is a record, and each line ends with a newline character \n. If data is not compressed, disk overhead and data parsing overhead are high.

It can be used together with compression methods such as Gzip and Bzip2 (the system automatically checks and decompresses the data for query). However, hive does not slice data for certain compression algorithms and therefore cannot perform parallel operations on data.

SequenceFile

A binary file provided by the Hadoop API, which is easy to use, separable, and compressed.

Three compression options are supported: NONE, RECORD, and BLOCK. The compression rate of RECORD is low. BLOCK compression is recommended.

RCFile

Storage mode: Data is divided into rows and each block is stored in columns.

  • First, the data is divided into blocks according to the row, to ensure that the same record in a block, avoid reading a record to read more than one block.
  • Secondly, block data column storage is beneficial to data compression and fast column access.

ORC

Storage mode: Data is divided into rows and each block is stored in columns

Hive provides a new RCFile format, which greatly improves performance and enables fast data compression and column access.

Parquet

Storage mode: column storage

Parquet is efficient for the types of large queries. Parquet is particularly useful for queries that scan a particular column in a particular table. Parquet is compressed using Snappy and Gzip. The default Snappy.

Parquet supports the Impala query engine.

Table file storage format as far as possible using Parquet or ORC, not only reduce storage, but also optimize the query, compression, table association and other performance;

Choose the right compression method

Hive statements are converted to MapReduce for execution. The performance bottleneck of MapReduce is related to network I/O and disk I/O. To solve the performance bottleneck, reduce the amount of data. Although compression reduces the amount of data, the compression process consumes CPU, but in Hadoop, the performance bottleneck is usually not the CPU, CPU pressure is not large, so compression makes full use of relatively idle CPU.

Comparison of common compression algorithms

How to choose the compression mode

  1. The compression ratio
  2. Compression and decompression speed
  3. Whether split is supported

There are multiple Mapper programs that process large data files in parallel. Most files do not support splitting because they can only be read from scratch.

Syntax and parameter level optimization

Column cutting

When Hive reads data, it can read only the columns required in the query and ignore other columns. This saves reading overhead, intermediate table storage overhead, and data consolidation overhead.

set hive.optimize.cp = true; -- Column clipping, select only the columns needed in the query. Default is true
Copy the code

Partitions cutting

You can select only required partitions to reduce the number of partitions read and the amount of data read.

set hive.optimize.pruner=true; // Defaults to trueCopy the code

Merge small files

Map input merge

When executing MapReduce, it is common for a file to be processed by a Mapper. However, if the data source is a large number of small files, it will start a large number of Mapper tasks, which will waste a lot of resources. Small files can be merged to reduce the number of Mapper tasks. A detailed analysis

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map side input, merge file after split by block size (default)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map input, not merge
Copy the code

Merge Map/Reduce output

A large number of small files bring pressure to the HDFS, affecting the processing efficiency. You can eliminate the impact by merging Map and Reduce result files.

set hive.merge.mapfiles=true;  -- Whether to merge Map output files. Default is true
set hive.merge.mapredfiles=true; -- Whether to merge Reduce output files. The default value is false
set hive.merge.size.per.task=25610001000; -- Size of the merged file. The default value is 256000000
Copy the code

Control the number of Map/Reduce jobs

Reasonable control of mapper quantity

The mapper number can be reduced by merging small files. The Mapper number can be increased by controlling the previous Reduce

Default calculation method of mapper number

Total file size: total_size Data block size set by the HDFS: dfs_block_size default_mapper_num = total_size/dfs_block_sizeCopy the code

MapReduce provides the following parameters to control the number of Map tasks:

set mapred.map.tasks=10;
Copy the code

If the value of default_mapper_num is greater than the value of default_mapper_num, the value of default_mapper_num will only be set.

So what if we need to reduce the mapper count, but the file size is fixed?

Mapred.min.split.size can be used to set the size of the file to be processed for each task. This size will only take effect if it is greater than dfs_block_size

split_size=max(mapred.min.split.size, dfs_block_size)          
split_num=total_size/split_size                   
compute_map_num = min(split_num,  max(default_mapper_num, mapred.map.tasks))
Copy the code

So you can reduce the number of Mapper.

Summarize the methods to control the number of Mapper:

  • If you want to increase the number of Mapper, set this parametermapred.map.tasksIs a larger value
  • If you want to reduce the number of Mapper, set this parametermaperd.min.split.sizeIs a larger value
  • If the input is a large number of small files, to reduce the number of Mapper, you can sethive.input.formatMerge small files

If you want to adjust the number of MapPer, before adjusting, you need to determine the approximate size of the file to be processed and the existence of the file (a large number of small files, or a single large file), and then set the appropriate parameters.

Control reducer quantity properly

If the reducer number is too large, a reducer will produce a result file and many small files will be generated. If these result files serve as the input of the next job, small files need to be merged, and starting and initializing the reducer requires costs and resources.

If the reducer number is too small, a reducer needs to deal with a large amount of data, and there may be data skew, which makes the whole query take a long time. By default, the reducer number is determined by the following parameters:

  • Parameter 1:hive.exec.reducers.bytes.per.reducer(default 1 g)
  • Argument 2:hive.exec.reducers.max(Default: 999)

Reducer is calculated as follows:

N= min (parameters2, total input data amount/parameter1)
Copy the code

You can control the reducer number by changing the values of the two parameters. Also through

set mapred.map.tasks=10; 
Copy the code

Directly control the reducer number. If this parameter is set, the above two parameters will be ignored.

The Join optimization

Priority filtering data

Try to reduce the amount of data in each stage, use as many partitioned fields as possible in the partitioned table, and select only the columns that need to be used later to minimize the amount of data participating in the join.

Small tables join large tables

When joining a large table from a small table, the principle of joining a large table from a small table should be followed. The reason is that in the Reduce phase of join operation, the contents of the table on the left of the join will be loaded into the memory, and the table with fewer entries will be placed on the left to effectively reduce the probability of memory overflow. In join, jobs are generated from left to right. Ensure that the size of the table in a continuous query increases from left to right.

Use the same connection key

In Hive, if three or more tables are joined and the on conditions use the same field, they are merged into a MapReduce Job. With this feature, you can add the same join on to one Job to save execution time.

Enable mapjoin

Mapjoin distributes small tables of both sides of a join directly to the memory of each Map process. In this way, join operations are performed in the Map process, eliminating reduce operations and increasing the speed. Mapjoin can only be enabled for the JOIN operation.

set hive.auto.convert.join = true; - Whether to automatically convert common Join on the Reduce server to Map Join based on the size of small tables and flush small tables into the memory.
set hive.mapjoin.smalltable.filesize = 2500000; -- Size of memory table brush (bytes)
set hive.mapjoin.maxsize=1000000;  -- The maximum number of rows processed by Map Join. If the number of rows exceeds, the Map Join process will exit unexpectedly
Copy the code

Atomic operation

Try to avoid a SQL containing complex logic and use intermediate tables to accomplish complex logic.

Bucket list mapjoin

When two bucket tables join on a bucket field and the number of buckets in the small table is a multiple of the large table, mapJoin can be enabled to improve efficiency.

set hive.optimize.bucketmapjoin = true; -- Enable the bucket table map Join
Copy the code

Group By optimization

By default, data of the same Key in the Map phase is distributed to one Reduce. If the data of a Key is too large, data skew will occur. The group by operation can be optimized in two ways:

1. Partial aggregation on the Map side

In fact, not all aggregation operations need to be performed on the Reduce side. Many aggregation operations can be performed on the Map side first, and then the final result can be obtained on the Reduce side.

set hive.map.aggr=true; - Enable aggregation parameter Settings on the Map side

set hive.grouby.mapaggr.checkinterval=100000; -- The number of items aggregated on the Map side
Copy the code

2. Perform load balancing when data is skewed

set hive.groupby.skewindata = true; Load balancing when data skew is present (default: False)
Copy the code

When the option is set to true, the generated query plan has two MapReduce jobs. In the first MapReduce job, map output results are randomly distributed to Reduce tasks. Each Reduce task performs partial aggregation operations and outputs the results. In this way, the same group by key may be distributed to different Reduce tasks. So as to achieve the purpose of load balancing; The second MapReduce task is then distributed to each Reduce according to the group by key based on the preprocessed data results, and finally completes the final aggregation operation.

The Order By optimization

Order by can only be performed in one Reduce process. Therefore, if you perform order BY on a large data set, a large amount of data will be processed in a Reduce process, resulting in slow query execution.

  • Work on the end resultorder by, do not sort on a large data set in the middle. If the final result is small and can be sorted on a reduce, then it is done on the final result setorder by.
  • If it is to sort the first N data, it can be useddistribute byandsort byThe first N items are sorted on each Reduce, and then the result sets of each Reduce are combined and sorted globally in one Reduce, and then the first N items are selected, because those who participate in global sortingorder byThe maximum amount of data isNumber of Reduce tasks x N, so the execution efficiency is very high.

The COUNT of DISTINCT optimization

-- Before optimization (there is only one reduce, and the burden is heavy to redo it before count) :
select count(distinct id) from tablename;

-- After optimization (start two jobs, one job is responsible for subquery (can have multiple reduce), the other job is responsible for count(1)) :
select count(1) from (select distinct id from tablename) tmp;
Copy the code

Read multiple inserts at once

The multi Insert syntax can be used in scenarios where data is read from a table and reused multiple times:

from sale_detail
  insert overwrite table sale_detail_multi partition (sale_date='2010', region='china' ) 
  select shop_name, customer_id, total_price where.insert overwrite table sale_detail_multi partition (sale_date='2011', region='china' )
  select shop_name, customer_id, total_price where. ;Copy the code

Description:

  • Generally, a maximum of 128 output lines can be written in a SINGLE SQL file. If the number of output lines exceeds 128, a syntax error is reported.
  • In a multi insert:
    • For partitioned tables, the same destination partition cannot appear more than once.
    • For an unpartitioned table, the table cannot appear more than once.
  • Different partitions of the same partition table cannot exist at the same timeinsert overwriteandinsert intoOperation, otherwise an error message is returned.

Enable compression

Map output compression

set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
Copy the code

Intermediate data compression

Intermediate data compression is used to compress data between multiple jobs queried by Hive. It is best to choose a compression method that saves CPU time. You can use the SNappy compression algorithm, which has high compression and decompression efficiency.

set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
Copy the code

Result data compression

The final result data (Reducer output data) can also be compressed, and a Reducer output data can be selected to reduce the data size and disk read and write time. Note: The commonly used Gzip and SNappy compression algorithms do not support parallel processing. If the data source is a large gzip/ SNappy compressed file, there will only be a Mapper to process this file, which seriously affects the query efficiency. Therefore, if the result data needs to be used as the data source for other query tasks, you can choose the LZO algorithm supported by Splitable. In this way, result files can be compressed and processed in parallel, which greatly improves the speed of job execution. Check out this article on how to install the LZO archive for a Hadoop cluster.

set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;  
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
Copy the code

Hadoop clusters support the following algorithms:

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.GzipCodec
  • org.apache.hadoop.io.compress.BZip2Codec
  • org.apache.hadoop.io.compress.DeflateCodec
  • org.apache.hadoop.io.compress.SnappyCodec
  • org.apache.hadoop.io.compress.Lz4Codec
  • com.hadoop.compression.lzo.LzoCodec
  • com.hadoop.compression.lzo.LzopCodec

Hive architecture optimization

Enable direct fetching

Hive can read data from the HDFS in either of the following ways: Enable MapReduce reading or capture.

Fetching data directly is much faster than reading data using MapReduce, but only a few operations can use direct fetching.

Can through the hive. The fetch. Task. The conversion parameters configuration is in what circumstances to use direct fetching method:

  • minimal: onlyselect *, in the partition fieldwhereFilter,limitDirect capture is enabled only in these three scenarios.
  • moreIn:select,whereScreening,limit, direct capture is enabled.
set hive.fetch.task.conversion=more; -- Enable fetch More mode
Copy the code

Localized execution

Hive queries run on multiple servers in a cluster by default. This mode solves the problem of large data query. However, when the amount of data in Hive query processing is small, it is unnecessary to enable the distributed mode for query execution because distributed design, cross-network transmission, and multi-node coordination consume resources. For small data sets, all tasks can be processed on a single machine in local mode, with significantly reduced execution time.

set hive.exec.mode.local.auto=true; - Enable the function for automatically determining whether to enable the local mode
set hive.exec.mode.local.auto.input.files.max=4; -- Maximum number of MAP tasks
set hive.exec.mode.local.auto.inputbytes.max=134217728; -- map Maximum size of the input file
Copy the code

The JVM reuse

Hive statements are converted into a series of MapReduce jobs. Each MapReduce job consists of a series of Map tasks and Reduce tasks. By default, A Map Task or Reduce Task in MapReduce starts a JVM process. After a Task is executed, the JVM process exits. If the task takes a short time and the JVM has to be started multiple times, the JVM startup time can become a significant expense, which can be solved by reusing the JVM.

set mapred.job.reuse.jvm.num.tasks=5;
Copy the code

The DOWNSIDE of the JVM is that enabling JVM reuse can tie up slots for used tasks for reuse until the task is complete. If several Reduce tasks in an unbalanced job spend much more time executing than other Reduce tasks, the reserved slots will remain idle and cannot be used by other jobs until all tasks are finished.

Parallel execution

Hive converts some query statements into one or more stages, including the MapReduce phase, sampling phase, merge phase, and Limit phase. By default, only one phase is executed at a time. However, some phases can be executed in parallel if they are not interdependent. Multi – stage parallelism is resource – consuming.

set hive.exec.parallel=true;  Concurrent execution can be enabled.
set hive.exec.parallel.thread.number=16;  The maximum parallelism allowed for the same SQL is 8 by default.
Copy the code

Speculated that perform

In the distributed cluster environment, because the program Bug Bug (including Hadoop itself), unequal load imbalance or resource distribution, can cause the same homework between multiple tasks running speed, the running speed of some tasks may be significantly slower than other tasks (such as a job of a task schedule is only 50%, While all other tasks have already run), these tasks will slow down the overall execution progress of the job. To prevent this, Hadoop uses a Speculative Execution mechanism, which speculates laggard tasks based on certain principles and starts a backup task for that task that processes the same data concurrently as the original task. Finally, the calculation result of the first successful operation is selected as the final result.

set mapreduce.map.speculative=true;
set mapreduce.reduce.speculative=true;
Copy the code

Advice:

These features can be turned off if users are sensitive to run-time deviations. If the user needs a map or Reduce task that takes a long time because of the large amount of data input, the waste of starting speculative execution can be huge.



Further reading

  • This article takes you through the HBase RowKey design quickly

  • Take you quick-and-dirty HBase | HBase column family optimization

  • Take you quick-and-dirty HBase | HBase to read and write performance optimization

  • Welfare! Introduction HBase correct posture

  • Learn Big Data from 0 -Hive Basics

  • Big Data -Hive configuration parameters

  • How to install snappy and LZO libraries in a Hadoop cluster