Introduction to optimize

Write optimization related article for the first time, first briefly talk about the optimization view. The first is that many optimizations, whether caching, indexing, sorting, etc., have the idea of reducing IO at their core. In the distributed scenario, the efficiency also follows the barrel effect. Tasks are executed in parallel, and the final task determines the time of the whole task. Therefore, balancing tasks of each task node is an important optimization idea in distributed scenarios.

Physical effects

If the physical configurations of the same role (datanode and Datanode, datanode and Namenode physical configurations are different) are different, distributed tasks will be negatively affected. However, the same roles are generally configured in distributed cluster construction. The main reason is that when new cluster nodes are added, inconsistent physical hardware information may affect the performance or not improve the performance. For example, the former nodes use mechanical hard disks, and the task efficiency of the newly added nodes using solid state will not be significantly improved in most cases. Switching to a lower CPU for new nodes also has a negative impact on the task.

data

In distributed computing, data parallelism is generally adopted, that is, data is divided into multiple parts, each task receives a piece of data for the same processing, and the processed result is the result of the whole task after aggregation. There are three main ways in which data can affect the Hadoop ecosystem:

  1. The raw data is unevenly distributed. As shown in the figure, if each node supports two tasks at most at the same time, due to data imbalance, Node1 must pull data from Node3 to start the second task (or node3 performs the first task and then starts the third task). As a result, the execution of Node1 and Node3 will be slower than that of Node2, dragging down the efficiency of the whole task. This situation occurs on the Map side of MapReduce, TeZ, and Spark. Periodically and after a new node is added, the data of each node can be rebalanced through balancer of HDFS.

  2. Data skew or data hotspot caused by uneven data transmission is also the main problem to be solved when we talk about optimization. Whether mapReduce, Tez, Spark, hbase or Kafka, data distribution is determined by hashing mode by default. The key to solve this problem is to design keys according to hash rules, set the number of partitions properly or redesign the rules for distributing data (partitioner).

  3. There is a special case where even if data is transmitted evenly to each node, the task time of each node can vary greatly. For example, when the computing logic filters some data and computes some data, the precisely filtered data is allocated to one node, and the calculated data is allocated to another node. As a result, the node containing only the filtered data ends the task immediately, wasting resources, while the node containing the calculated data takes a long time to finish the task. This situation needs to be analyzed on a case-by-case basis based on data and task logic.

Configuration optimization

Back to Hive optimization, let’s first list some hive optimization points from the configuration.

map

  1. Control the number of Map tasks properly. The number of Map tasks is calculated by minSplitSize, maxSplitSize, and blockSize. The default value is the number of blocks. The values of minSplitSize and maxSplitSize can be adjusted if necessary. However, according to the principle of local data lines, the splitSize should be an integer multiple of blockSize to reduce data transmission over the network.

    -- Adjust the number of mapTasks to be greater than the default number
    set mapred.map.tasks=10;
    
    Set this parameter to a value greater than blockSize to reduce the number of tasks
    set maperd.min.split.size=9999999999;Copy the code
  2. Map Join. Map Join distributes the smaller tables in the join to the memory of each Map task, and joins directly in the memory, thus eliminating the reduce step and improving efficiency:

    -- Enable Map Join. This function is enabled by default
    set hive.auto.convert.join=true;
    
    -- Set threshold for large table and small table (default 25M is considered as small table) :
    set hive.mapjoin.smalltable.filesize=25000000;
    
    -- The maximum number of rows processed by Map Join. If the number of rows exceeds, the Map Join process will exit unexpectedly
    set hive.mapjoin.maxsize=1000000; 
    
    This parameter can be used to adjust the memory threshold of a Map Join
    /* There is an explanation for this parameter: Whether Hive enables the optimization about converting common join into mapjoin based on the input file size. If this parameter is on, and the sum of size for n-1 of the tables/partitions for an n-way join is smaller than the size specified by hive.auto.convert.join.noconditionaltask.size, the join is directly converted to a mapjoin (there is no conditional task). */
    set hive.auto.convert.join.noconditionaltask.size=25000000;
    
    /*+ mapJoin (smallTable)*/ specifies the table to which mapJoin is to be performed.
    -- But to turn off ignoring the mapJoin flag, it is enabled by default
    set hive.ignore.mapjoin.hint=true;
    Copy the code
  3. Map side aggregation. Not all aggregation operations need to be performed on the Reduce side. Many aggregation operations can be performed on the Map side first, and the final result can be obtained on the Reduce side:

    - Enable aggregation Settings on the Map side. This function is enabled by default
    set hive.map.aggr=true;
    The number of items to be aggregated on the Map side.
    set hive.groupby.mapaggr.checkinterval=100000;
    -- Minimum ratio of aggregation on the map side, 0.5 by default. 100,000 records are aggregated in advance. If the amount of aggregated data /100000 is greater than the configured value, no aggregation is performed.
    set hive.map.aggr.hash.min.reduction=0.5;
    Copy the code
  4. Load balancing is performed when data skew is present. By default, data of the same Key is distributed to one Reduce in the Map phase. If the data of one Key is too large, the data skew is performed:

    Load balancing when there is data skew (gropu by) (default: false)
    set hive.groupby.skewindata=true;
    Copy the code

reduce

  1. Adjust the number of Reduce tasks:

    -- Each Reduce processes the amount of data. Reducing this value improves parallelism and may improve performance. However, excessive reduction may also generate excessive reducer, which has a potential negative impact on performance.
    set hive.exec.reducers.bytes.per.reducer=256000000;
    
    -- Maximum number of reduce tasks for each job. The default value is 1009
    set hive.exec.reducers.max=1009;
    
    - Hive calculates the number of Reduce operations based on the preceding two parameters and total data
    -- reduceTaskNum = min(reducers.max, total_bytes / bytes.per.reduce)
    
    -- You can also manually set the number of reduceTask (0.95 * Number of Datanodes)
    set mapreduce.job.reduces=5;
    Copy the code

Enable compression

The types of compression available include:

format tool algorithm File extension Is it detachable?
Gzip Gzip DEFLATE .gz no
Bzip2 Bzip2 Bzip2 .bz2 is
LZO Lzop LZO .lzo Yes (if indexed)
Snappy A null value Snappy Snappy no
  1. Map output compression:

    set mapreduce.map.output.compress=true;
    set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
    Copy the code
  2. The general rule for intermediate data compression is to use a detachable compression method as much as possible, otherwise very few mappers will be created. If the input data is text, bzip2 is the best option. Snappy is the fastest compression option for THE ORC format:

    set hive.exec.compress.intermediate=true;
    set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
    set hive.intermediate.compression.type=BLOCK;
    Copy the code
  3. Result data compression:

    set hive.exec.compress.output=true;
    set mapreduce.output.fileoutputformat.compress=true;
    set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
    set mapreduce.output.fileoutputformat.compress.type=BLOCK;
    Copy the code

Other configuration

  1. Parallel execution, parallel execution is a SQL parsing of many stages, the parser will determine the relationship between each stage, stage without dependencies can be executed in parallel:

    -- Enable task parallel execution Default is false
    set hive.exec.parallel=true; 
    The maximum parallelism allowed for the same SQL is 8 by default.
    set hive.exec.parallel.thread.number=16;
    Copy the code
  2. Strict mode:

    -- # hive2.x defaults to stric and nonstrict before 2.x
    set hive.mapred.mode=stric;
    
    In strict mode, three types of queries are prohibited: 1. For partitioned tables, partition fields must be qualified, in other words, not all partitions are allowed to scan. This is because the data volume of all partitions is usually large, so as to avoid consuming a lot of resources. 2. For the query using Order BY, the limit statement must be used, because order by will distribute all result data to the same Reducer for processing in order to perform the sorting process, so as to avoid the long time of reducer execution. SQL > alter table join on; alter table join on; * /
    Copy the code
  3. Small file merge. If small file merge is not performed, each small file will be identified as a split task when the task is started, and a map task will be allocated, and the namenode consumption will be increased:

    Merge small files at the end of the map Only task
    set hive.merge.mapfiles = true;
    -- Small file size
    set hive.merge.smallfiles.avgsize = 16000000;
    -- true merges small files at the end of MapReduce tasks
    set hive.merge.mapredfiles = false;
    -- Merge file size
    set hive.merge.size.per.task = 256*1000*1000;
    Copy the code
  4. Vectorized computing is enabled. Hive processes data row by row by default. Vectorization indicates that Hive processes data in blocks (a block contains 1,024 rows) rather than one row at a time. Vectorization only works with ORC file formats:

    -- The default value is true for Hive 0.13.0 or later
    set hive.vectorized.execution.enabled = true;
    Copy the code
  5. With CBO enabled, Hive by default follows a set of rules to find an optimal query execution plan. Cost-based optimization (CBO) evaluates multiple query execution plans, assigns costs to each plan, and then determines the plan with the lowest cost to execute the query:

    - start the CBO
    set Enable Cost Based Optimizer = true;
    -- Enables simple queries like count(*) to be answered using statistics stored in its meta-store.
    set hive.compute.query.using.stats = true;
    -- Column statistics are created when CBO is enabled. Hive uses column statistics stored in the meta-store to optimize queries. If you have a large number of columns, it can take a long time to extract column statistics for each column. If set to false, extracting column statistics from the meta-store is disabled.
    set hive.stats.fetch.column.stats = true;
    Basic partition statistics such as row count, data size, and file size are stored in the meta-store. If set to true, partition statistics are extracted from the meta-store. If set to false, the file size is extracted from the file system. The number of rows is extracted from the row schema.
    set hive.stats.fetch.partition.stats = true;
    Copy the code
  6. For JVM reuse, Hive statements are converted to a series of MapReduce tasks. Each MapReduce Task consists of a series of Map tasks and Reduce tasks. By default, A Map Task or Reduce Task in MapReduce starts a JVM process. After a Task is executed, the JVM process exits. If the task takes a short time and the JVM has to be started multiple times, the JVM startup time can become a significant expense, which can be solved by reusing the JVM. :

    -- Set the number of JVMS to be reused
    set mapred.job.reuse.jvm.num.tasks=10;
    Copy the code
  7. Dynamic partitioning:

    - Dynamic partitioning is enabled. This function is enabled by default
    set hive.exec.dynamic.partition=true;
    
    Adjust the total number of dynamic partitions created
    set hive.exec.max.dynamic.partitions=5000;
    Total number of dynamic partitions per node
    set hive.exec.max.dynamic.partitions.pernode=2000;
    Copy the code
  8. Inference execution, which starts a number of repetitive tasks to detect slow task trackers and put them on the reject list. At the same time, the overall execution of the job was improved by optimizing the results of each task:

    -- Enable inference execution, default is false
    set hive.mapred.reduce.tasks.speculative.execution=true;
    Copy the code

Table design optimization

  1. Select the appropriate data storage format:

    1. TextFile
      • Is the default format, if the default format is not specified during table construction.
      • Each line is a record, and each line is preceded by a newline character\nAt the end. If data is not compressed, disk overhead is high and data parsing overhead is high.
      • It can be used together with compression methods such as Gzip and Bzip2 (the system automatically checks and decompresses the data for query). However, hive does not slice data for certain compression algorithms and therefore cannot perform parallel operations on data.
    2. SequenceFile
      • A binary file provided by the Hadoop API, which is easy to use, separable, and compressed;
      • Three compression options are supported: NONE, RECORD, and BLOCK. The compression rate of RECORD is low. BLOCK compression is recommended.
    3. RCFile
      • Data is divided into blocks in rows, and each block is stored in columns.
      • First of all, the data is divided into blocks according to the line, to ensure that the same record in a block, to avoid reading a record to read multiple blocks;
      • Secondly, block data column storage is beneficial to data compression and fast column access.
    4. ORC
      • Data is divided into blocks in rows, and each block is stored in columns.
      • Hive provides a new RCFile format, which greatly improves performance and enables fast data compression and column access.
    5. Parquet
      • Column storage;
      • Efficient for large query types, Parquet is particularly useful for scanning specific columns in a particular table;
      • Generally, Snappy and Gzip are used for compression. The default value is Snappy.
    6. The most commonly used format is ORC, but there are many other formats to choose from on the website, like FileFormats
  2. Select an appropriate data compression format:

    format tool algorithm File extension Is it detachable?
    Gzip Gzip DEFLATE .gz no
    Bzip2 Bzip2 Bzip2 .bz2 is
    LZO Lzop LZO .lzo Yes (if indexed)
    Snappy A null value Snappy Snappy no
  3. Reasonably designed table partition, partition table in the partition dimension of data storage classification, a partition corresponds to a directory. Note that dynamic partitioning specifies a partition field as a partition, not a partition field as a partition, for example, age=20. During filtering, filtering according to partition can filter a large amount of irrelevant data to improve the query efficiency, such as setting partition according to time:

  4. The principle of bucket partitioning is the same as that of partition. It speeds up query efficiency by reducing data processing. However, partition stores data in different directories according to field values, while bucket partitioning stores data in different files according to field hashing mode. The other is that if Hive is to support transactions, the table must be an ORC bucket table.

Grammar to optimize

  1. Column clipping and partition clipping both reduce IO to speed up queries;

  2. The join optimization:

    1. Before join, filter as much data as possible to reduce the amount of data participating in join;
    2. The reason why small tables join large tables is that in the Reduce phase of join operation, the contents of the tables on the left side of the join will be loaded into the memory, and the tables with fewer entries will be placed on the left side to effectively reduce the probability of memory overflow. In join, jobs are generated from left to right. Ensure that the size of the table in a continuous query increases from left to right.
    3. Use the same connection key. In Hive, if three or more tables are joined and the on conditions use the same field, they are merged into a MapReduce Job. With this feature, you can add the same join on to one Job to save execution time.
    4. Enable map Join;
  3. Order by, sort by, distribute by, cluster by:

    1. Only one Reduce task is allowed (with caution when a large amount of data is used).
    2. Sort by: sorts current Reduce tasks. Sorting is completed before data is sent to reducer. Therefore, if sort by is used for sorting and mapred.reduce.tasks>1 is set, sort by only ensures the ordered output of each reducer, but not the global order.
    3. Distribute by: controls how data is split from the Map end to the Reduce end. Hive distributes distributed packets based on the number of Reduce packets in the column next to Distribute by. The default value is the hash algorithm. Sort by produces a sort file for each reduce. In some cases, you need to control which Reducer a particular row should go to, usually for subsequent aggregation operations. Distribute by just might do this. Therefore, distribute by is often used with sort by;
    4. Cluster by, equivalent to sort by + distribute by, but cannot specify sorting mode desc or ASC, default ASC;
    5. If it is to sort the first N data, it can be useddistribute byandsort byThe first N items are sorted on each Reduce, and then the result sets of each Reduce are combined and sorted globally in one Reduce, and then the first N items are sorted.
  4. Count DISTINCT optimization:

    -- Before optimization (there is only one reduce, and the burden is heavy to redo it before count) :
    select count(distinct id) from tablename;
    
    -- After optimization (start two jobs, one job is responsible for subquery (can have multiple reduce), the other job is responsible for count(1)) :
    select count(1) from (select distinct id from tablename) tmp;
    Copy the code

Other optimization

  1. Try to avoid a SQL contains too complex logic, especially some data calculation logic, fixed or similar logic can be added to simplify SQL logic;

  2. UDF can be used to implement complex computational logic or logic that is difficult to implement in SQL.

  3. Data skew:

    1. The result caused by different keys means that data keys are scattered. However, data distribution is based on the hash of key values by default. After hashing of different keys, most data is stored in a certain partition, resulting in data skew in the partition. You can adjust the number of partitions or customize partitions to distribute data evenly to different partitions to avoid data skewness.

    2. If the same key is used, it means that adjusting the number of partitions or customizing partitions cannot solve the data skew problem:

      1. Data collection loss (such as user name, user ID, etc.) and has no or little impact on calculation, such as statistical UV, filtering this part of the data during calculation;

      2. Data collection loss (access address) and impact on calculation, such as PV statistics, corresponding fields updated to random values among optional values before calculation;

      3. Data characteristics, for example, the amount of data in Guangzhou is more than that in Zhongshan, the amount of data in business circle A is more than that in business circle B, and the number of records in time period A is more than that in time break B, etc. According to the data situation, some special information can be added to the key to make it evenly dispersed to calculate the intermediate result and then calculate the final result:

conclusion

In general, Hive optimization focuses on two aspects: reducing I/OS and balancing tasks among nodes (data skew is a more common and specific problem). To reduce IO, skip unnecessary data as much as possible such as column clipping, partition clipping, etc.

As for data skew, the operations that cause data skew are:

keywords situation The consequences
join One of the tables is smaller, but key concentrated The data distributed to one or more Reduces is much higher than the average
join Large table and large table, but bucket judgment field 0 value or null value too many These null values are processed by a Reduce, which is very slow
group by The group by dimension is too small and the number of certain values is too large Processing a certain value of Reduce is time-consuming
count distinct Too many special values The reduce time spent processing this particular value

In fact, the root cause is that data is distributed during shuffle calculation. By default, data is distributed based on the hash of the key value. Data skew optimization can be performed based on the principle and data characteristics.

In actual production, you need to observe the USAGE of CPU, memory, disk, and network information to determine performance bottlenecks.

Reference Documents:

  1. Learn Big Data from 0 -Hive Performance Optimization
  2. Optimize Apache Hive with Apache Ambari in Azure HDInsight
  3. Apache Hive Performance Tuning