preface

With the rapid development of the era of big data, enterprises need to store, calculate and analyze trillions of data every day, and at the same time ensure the timeliness, accuracy and integrity of the analyzed data. Faced with such a huge data system, it becomes a difficult problem for ETL engineers (data analysts) to calculate efficiently and accurately for the use of business.

As a data intelligence company, GETU has accumulated rich experience in the field of big data computing. This article will propose solutions to the two major pain points of slow task and interrupted task in the offline computing process of big data, in the hope that readers can learn something from it.

First, the task is slow

“Slow task execution” usually refers to a task that takes more than 10 hours to execute and does not meet the data user’s demand for timely data. For example, the business party can view the data of T-1 in the morning, but due to the delay of the task, the business party can only search and browse the data of T-1 in the afternoon or evening, thus unable to timely find operational problems and make efficient decisions. Therefore, optimization for slow tasks becomes an essential task for ETL engineers.

In long-term big data practice, we find that slow tasks tend to have certain commonalities. If we can identify the problem and find the right remedy, we can reduce the task time dramatically. The common problems of slow task execution can be summarized into the following four points: logic redundancy, data skew, large table reuse, and slow executor. Each pain point will be explained in detail below.

1. Logic redundancy

“Logical redundancy” is often caused by ETL engineers paying more attention to whether the processing results meet expectations when processing and calculating data, rather than considering whether there is a more efficient processing method. As a result, tasks that could be handled by simple logic are actually performed by using complex logic.

Reducing “logical redundancy” depends more on the accumulation of experience and the improvement of logical thinking and coding skills. Here are some advanced functions that can help developers further improve their data processing efficiency.

Grouping sets

Grouping statistical functions. This function can be used to output statistics of different dimensions in a single SQL. This function can be used to output statistics of different dimensions in a single SQL. This function can be used to output statistics of different dimensions in a single SQL.

Lateral view explode()

Line to line function. This function can only handle data in the form of array. It needs to be used with the split() function.

There are a few other functions, names, and functions that you can use for your own purposes:

  • Find_in_set () : Finds the position of a particular string within a specified string
  • Get_json_object () : Extracts the specified data from the JSON string
  • Regexp_extract () : Extracts the specified character that matches the regular expression
  • Regexp_replace () : Replaces the specified character with a regular replacement
  • Reverse () : Reverse the string

2. Data skew

“Data skew” refers to the phenomenon that in the process of MR calculation, the amount of data that some Map Jobs need to process is too large and takes too long, thus the whole process cannot be finished for a long time, and the task processing progress is stuck at 99% for a long time.

For data skew, developers can make changes at the code level, as follows:

  • Use Group By to replace COUNT (DISTINCT ID)
  • Use the MAPJOIN operation or subquery operation to replace the JOIN operation for the size table association
  • If GROUP BY is skewed, the value of the grouping field should be randomly divided into random value + original value
  • The JOIN operation avoids Cartesian product, which means that the associated fields do not have a large number of duplicates

In the previous article, Hive Data Skew has been explained in detail. For those who are interested in Hive Data Skew, click here to learn more about it

3. Large table reuse

“Large table multiplexing” refers to the repeated traversal of hundreds of millions or even billions of large table data to obtain similar results. Avoiding large table reuse requires the ETL engineer to think systematically, to be able to use low frequency traversal to reduce billions of large table data to a reusable middle small table, while supporting subsequent calculations.

Therefore, engineers need to take the overall engineering structure into consideration at the beginning of the project development, and adhere to the principle of “big table only used once”, in order to improve the efficiency of the entire project.

Here is an actual combat example for the reader’s reference:



Table GEQI_WIN_TMP: 50 million

4. Slow actuator

“Slow executor” is when the data volume becomes too large for Hive’s underlying computing logic to traverse all the data in a single partition quickly.

Spark is much more efficient than MapReduce in traversing data with the same resources. Moreover, Spark task preempts resources much more than MapReduce task, which can occupy a large amount of resources in a short time to complete the task efficiently, and then quickly releases resources to improve the execution efficiency of the entire cluster task.

Therefore, in this case, developers should consider using a more efficient computing engine such as PySpark for fast data traversal. At the same time, developers also need to consciously strengthen thinking training, develop good development habits, and explore faster, more accurate and more systematic computing and processing methods in the face of massive data.

II. Mission interruption

For a variety of reasons, online tasks are often killed and reexecuted. The re-execution of the task will seriously waste the cluster resources and delay the results of the data calculation, thus affecting the data application of the business side. How to prevent this from happening? Here’s how to solve the problem.

The timing task of Individual push is developed based on Azkaban scheduling system. Data analysts of Individual push mainly use four codes, shell, HSQL, MySQL and Pypark, to process data, clean and calculate the original log, and then generate data of public layer and report layer for the business party to use.

Therefore, each push needs to set up four code executors to support the handling of different types of code in scripts. The three core elements are described here: code block input, execution functions, and circulators.

1. Code block input

In general, shell, HSQL, MySQL, Pypark code in the script will be executed directly in order, not selectively. In practice, we assign code blocks to variables in the shell as strings, and mark the type of code at the beginning of the string. When the code is executed to the specific steps, there is only assignment operation, not parsing execution, as follows:

A HSQL code block is executed freshly

A shell code block is executed

A MySQL code block is executed

A code block is executed for PySpark

In this way, it is implemented to put different code into the corresponding step_n. This code can be executed directly in subsequent executors, and the developer only needs to worry about logical processing.

2. Execute functions

The execution function parses and executes the string in the variable step_n in the shell. Different types of code blocks are parsed differently, so different execution functions need to be defined. The function is generally placed separately in the configuration file of the whole project and called by way of source. The specific function definition is as follows:

The Hive, MySQL, and Shell execution functions are relatively simple and can be executed directly through hive-e or eval. PySpark needs to configure the corresponding queues, paths, parameters, etc. It also needs to add the spark.py file to the project to execute it. I won’t go into details here.

3. Circulator

The circulator is the core of the breakpoint execution function and is the controller of the step. The circulator determines which step to take by judging the name of the shell variable and which function to use to parse the code and execute by judging the string contents of the variable.

The code below is a reference case:

The developer needs to define the end steps of the entire code at the beginning of the script to ensure that the circulator works; At the same time, the start step can be passed in as a script parameter, which is a good way to implement the breakpoint execution of the task.

conclusion

The problem of task slowness and task interruption in ETL engineering is one that every big data engineer needs to face and solve. Based on the practice of individual push big data, this paper puts forward the corresponding solutions and solutions to the problems of slow tasks and interruptions, hoping to help readers broaden their thinking in task optimization and ETL engineering development, improve the efficiency of task execution, and reduce the cost of labor and machine for task maintenance.