For Hive users, mastering Hive DDL and DML is the most basic operation, which is not enough in real projects. In practical projects, there are often questions such as “Why is this Hive SQL not running for so long? Hive SQL SQL: Hive SQL SQL: Hive SQL: Hive SQL: Hive SQL: Hive SQL: Hive SQL: Hive SQL: Hive SQL Why is my Hive SQL always hanging there

Of course, this actually involves Hive SQL optimization, and to understand Hive SQL optimization, you must understand the execution principles and mechanisms behind it

In the era of big data, storage and computation are cheaper and computing power is more powerful, but big data is not free of cost

A good Hive SQL and a poorly written Hive SQL can use a hundredfold, or even a thousandfold, of the underlying computing and resources, and a computing cluster has a limited number of slots (especially in real projects where your department is allocated a limited number of slots). Improper use of Hive SQL by one user may monopolize the resources of the entire cluster, causing other users to wait. If the task is in a production environment, it is more likely to lead to production failures.

In addition to the waste of resources, it is possible for users to run Hive SQL inappropriately for several hours or even tens of hours without results. If used properly, results can be obtained in minutes, so understanding Hive SQL is a huge productivity boost.

This section focuses on the Hive SQL execution process and principles. Hive SQL used for actual business requirements can vary, and SQL logic can range from a single line to hundreds or thousands of lines, but the basic patterns can be broadly grouped into three categories:

  • The select statement:
Select id,name from stu where id = 5 and name = 'zhang3'Copy the code

In practice, where conditions can be more complex and have various combinations such as and/or

  • Group by statement:
select id,count(1) from stu group by id
Copy the code
  • Join statement: that is, join two tables, or in practice join multiple tables

Hive SQL development is only a combination of the preceding three scenarios. The following describes the background execution mechanism and principles of the preceding three scenarios.

Select statement execution diagram

Consider the following Hive SQL statement:

select order,byyer_id,cate_name from orders_table where day = '20170101' and cate_name = 'iphone7';
Copy the code

Its business background is to analyze the customer situation of apple’s iphone7, so the first step is to capture the details of iphone7 orders. This Hive SQL captures all order details of iphone7 for all goods purchased on January 1, 2017, as the basis for analysis to prepare data. The actual captured record may be very large, this is just an example, can add a limit statement (such as limit 1000)

How does the select statement perform on the underlying Hadoop cluster and MapReduce?

Its execution process is shown in the figure:Hive SQL is translated into MapReduce jobs. The execution stages of Hive SQL are the same as those of MapReduce jobs, including input fragmentation, Map, Shuffle, and Reduce. The following describes each stage based on the diagram

1, 1 Input sharding

In actual project design, the order table is usually partitioned according to the natural day, so the above SQL limit day=20170101 actually limits the partition file of day=2020170101 (Hadoop will read all the order files if the partition condition is not limited, If have 2 orders for 730 days, you can read this all order documents for 730 days, this is what is said above is not properly Hive SQL, it will cause a lot of unnecessary costs and overhead Hadoop cluster), so the Hadoop will only read 20170101 orders file is 300 MB, If the split size is 128MB (hadoop1.x is 64MB, hadoop2.x is 128MB, which can be configured by parameter), then the number of split tasks is 3, and their sizes are 128MB, 128MB, and 44MB respectively

1 and 2 Map phases

The number of Map tasks is determined by the number of split in the sharding phase. The input file of the SQL above (i.e. 20170101 order file) is divided into three input shards, so the Hadoop cluster will start three Map tasks (only two Map tasks are drawn during the diagram simplification). Each Map task receives its own shard file and checks the input file line by line in the Map function: is the item category iphone7? If not, filter it; if yes, get the column values specified in the SELECT statement and save them to a local file.

1. 3 Shuffle and Reduce phase

This SQL task does not involve data redistribution or distribution, and no Reduce task needs to be started

1. 4 Output files

Hadoop directly merges the output files of Map tasks into the output directory. In this example, you only need to merge the local output of each Map task into the output directory.

Hive select statement execution is straightforward and simple, but understand the concept of parallel execution. The above file is only 300MB, but what if the input file is 3GB or even 3TB? For Hadoop, just start more Map tasks. For example, if the input file is 3 gb, 24 Map tasks (3 x 1024 MB/128MB) need to be started. If the input file is 3TB, 24576 Map tasks need to be started. The above Select with input files of 300MB, 3GB, and 3TB will usually take an order of magnitude of execution time. For example, if 300MB takes 1 minute, then even 3TB will take a minute.

This is because all Map tasks are executed in parallel, which is an advantage of Hadoop, simply adding machines and nodes to process more data. For real users, Hive SQL logic is the same and no special processing is required, but as mentioned above, use only the data you need (by specifying partitioning conditions) so as not to impose unnecessary overhead on the computing and storage resources of the Hadoop cluster.

2, Group by statement execution diagram

The group BY statement is the most frequently used statement in data analysis. The following describes how Hive group BY statements are executed on Hadoop clusters and MapReduce.

select city,count(order_id) as iphone7_count from orders_table where day = '20170101' and cate_name = 'iphone7' group by  city;Copy the code

The SQL above will group cities, count the number of iPhone orders in each city, calculate the order quantity of each city, and sort them. Of course, it’s easy to determine which city sells the best iphone7 and which city sells the worst

Note that the cate_name of the Hive group by statement has been changed to iphone7 for convenience.The Group BY statement of Hive SQL involves data redistribution and distribution. Therefore, the execution process of Hive SQL includes the execution process of MapReduce jobs

2.1 input sharding

The group by statement input day=20170101 partition file, the input process and number of the partition file are the same as the select statement, also divided into three: 128MB, 128MB, 44MB respectively

2.2 Map phase

The Hadoop cluster also starts three Map tasks to process the corresponding three shard files. Each map task processes each line in its corresponding shard file to check whether the product category is iphone7. If so, the key value pair of <city, 1> will be output. Therefore, the number of orders should be counted according to city (note the difference with select statement).

2.3 Combiner phase

The Combiner phase is optional. If the Combiner operation is specified, Hadoop performs the Combiner operation on the local output of a Map task, which removes redundant output and avoids unnecessary follow-up processing and network transmission overhead. In this example, MapTask1 output < Hz,1> appears twice, then Combiner< Hz,2>. However, the Combiner operation is risky. The principle for using it is that the Combiner output does not affect the final input of Reduce calculation. For example, if you only calculate the total number, maximum value, and minimum value in calculation, you can use Combiner. The final Reduce calculation results will be wrong.

2.4. Shuflle Phase

The output of the Map task must go through a phase called Shuffle before it can be processed by the Reducer. The Shuffle process is the core of MapReduce. It refers to the entire processing process from Map task output to Reduce task input. The complete Shuffle process includes partition, sort, spill, copy, and merge. Shuffle is classified into Shuffle on the Map end and Shuffle on the Reduce end.

For understanding the Group BY statement of Hive SQL, the key process is actually two, namely partition merge, the so-called partition, that is, how Hadoop decides to allocate each Map task, each output key value pair to the ReduceTask. The so-called merge is how to combine the values of the same key from multiple MapTasks in a ReduceTask.

Taking the above large figure as an example, the output of MapTask1 contains two key value pairs < Hz, 2> and < BJ,1>, so should MapTask1 pass the above key value pairs to ReduceTask1 or RediceTask2? The most commonly used partitioning method in Hadoop is Hash Partitioner. That is, Hadoop takes the Hash value of each key and then modulates the Hash value according to the number of Reduce tasks to obtain the corresponding reducer. This ensures that the same keys will be assigned to the same Reduce. The Hash function also ensures that the output of the Map task is evenly distributed to all Reduce tasks.

The same key-value pairs of multiple Reduce tasks are allocated to the same Reduce Task by the Partition process. The merge operation merges their values. For example, MapTask1 output is < Hz,2> for key = Hz key-value pairs. MapTask2 output is <hz,1>, and Reduce Task1 is merged into <hz,{2,1}> as the input of Reduce Task1.

2.5. Reduce Phase

For the group BY statement, Reduce Task receives input in the form of < Hz,{2,1}>. Reduce Task only needs to call Reduce function logic to summarize the input. For < Hz,{2,1}>, that is, 2+1=3. Similarly, if the input is < Hz,{2,1,2,3}>,2 +1+2+3=8 is obtained, and the output of each Reduce job is saved in a local file.

2.6. Output files

Hadoop merges the output files of Reduce tasks to the output directory. In this example, you only need to merge the local output files of two Reduce tasks into the output directory.

3. Join statement execution diagram

In addition to the group BY statement, association analysis is often needed in data analysis, that is, join operation is needed to two tables. For example, business personnel want to analyze the age distribution of customers who buy iphone7. The order table only contains the customer ID, and the customer’s age is stored in another table. At this point, we need to join the order table and the customer table to get the analysis result. The related SQL is as follows:

select t1.order_id,t1.buyer_id,t2.age
from
(
select order_id,buyer_id from orders_table where day = '20170101' and cate_name = 'iphone7'
) as t1
join
(
select buyer_id,age form buyer_table where buyer_staus = '有效'
) as t2
on t1.buyer_id = t2.buyer_id;
Copy the code

The above Join SQL is split into three MapReduce tasks in a Hadoop cluster.

  • The first MapReduce task is the T1 table part, which is the select statement in 1.1, but the output file contains only the order_ID and buyer_ID columns.
  • The second MapReduce task is the t2 table part, which is similar to the select statement in 1.1, except that the table is buyer, the output column is buyer_id and age, and the filter condition is buyer_staus = ‘valid’.
  • The third MapReduce task: the T1 and T2 table join procedure, which associates and merges the first and second result file outputs and then outputs.

The following figure shows the large view of the Hive Join statement execution. (For convenience, the large view of the execution process of the first MapReduce job and the second MapReduce job is not displayed. For details, see the large view of selCT statement execution above, and their output files are directly used here.)

The JOIN statement of Hive SQL also redistributes and publishes data. Unlike group BY statements, group BY statements redistribute and distribute data based on group BY columns. The JOIN statement redistributes and distributes data based on the columns of the join (again, by buyer_ID).

3.1 input sharding

The input file of the Join statement includes the first MapReduce job and the output file of the second MapReduce job. For Hadoop, they are still shard based on file size, so if the first output file is 150MB in size, there are two shards: 128MB and 22MB. The output file of the second MapReduce job is also sharded in this way. The large figure below shows only one shard output for each output file for simplicity.

3.2 Map phase

The Hadoop cluster will start a corresponding number of MapTasks on the split data of the two output files. For example, if the first output file contains two splits, the first output file returns two MapTasks and the second output file is similar.

3.3. Shuffle Phase

For join statements, the Shuffle process is a process of redistributing and distributing data according to the join columns. The Partition column of the Join statement is the Join column, which in this case is buyer_id. Therefore, the output of all Map tasks corresponding to output files 1 and 2 will be redistributed and distributed according to the value of buyer_id.The key distribution specification of the Join process is similar to that of the group by statement. The most commonly used Partition methods also Hash Partitioner, that is, it takes the Hash value of each Join key and modulates the Hash value according to the number of Reduce tasks to obtain the corresponding Reducer. In this way, the same JOIN keys must be assigned to the same Reducer to complete the column association

In the figure, it is assumed that buyer_id 111 and 222 will be distributed to Reduce Task1, and buyer_id 333 and 444 will be distributed to Reduce Task2.

3.4 Reduce phase

For example, for the column buyer_id=222, order_id=1003 of mapTask1-1 and rows buyer_id=222 are distributed to Reduce Task1. Map Task2-1’s buyer_id=222 age=25 columns are also distributed to Reduce Task1, which then merges their values into a row based on buyer_id and writes them to the local output file.

3.5. Input and output files

Hadoop merges the output files of Reduce Tasks into the output directory. In this example, you only need to merge the local output files of every two Reduce tasks into the output directory.

The above join statement involves joining two tables, but what about multi-table joins? For example, there is a buyer granularity of 30 transaction summary table (such as including its recent 30 days of trading volume, transaction amount, etc.), add SQL statement as follows:

select t1.order_id,t1.buyer_id,t2.age,t3.buy_order_count_30d from ( select order_id,buyer_id from orders_table where day = '20170101' and cate_name = 'iphone7') as t1 join (select buyer_id from buyer_table where buyer_staus = 'valid') as t2 on t1.buyer_id = t2.buyer_id join ( select buyer_id,buy_order_count_30d from buyer_stat_table ) as t3 on a1.buyer_id = t3.buy_idCopy the code

SQL > select buyer_id from table t3, select buyer_id from table T3, select buyer_id from table T3, select buyer_id from table T3, select buyer_id from table T3. Rows of three tables with the same buyer_id are all associated and output to a Reduce Task.

SQL > select buyer_id from t3 where table t1 and table t3 are not related by buyer_id

select t1.order_id,t1.buyer_id,t2.age,t3.seller_star_level
from
(
select order_id,buyer_id from orders_table where day = '20170101' and cate_name = 'iphone7'
) as t1
join
(
select buyer_id,age form buyer_table where buyer_staus = '有效'
) as t2
on t1.buyer_id = t2.buyer_id
join
(
select seller_id,seller_star_level from seller_table
) as t3
on a1.buyer_id = t3.seller_id 
Copy the code

In other words, the t1 order table is associated with the T2 buyer table by buyer_id, and the T3 seller table by seller_id.

SQL > select buyer_id from t1; select seller_id from t3; select buyer_id from t2; select seller_id from T3; In this way, the data is actually shuffled and reduced twice. Table T1 and table T2 are distributed and associated with table T3 based on buyer_id (Shuffle and Reduce for the first time), and then the results are distributed and associated with table T3 based on seller_id (Shuffle and Reduce for the second time). Therefore, multi-table association must take longer, so it needs to complete two sequential Reduce processes.