background

The data union of 10 tables [A, B, C, D, E, F, G, H, I, J] needs to be joined with A field in table ∂ to achieve the purpose of filtering data. Among them, the data volume of Table A is about 32 billion, the data volume of the other 9 tables is about 2 billion respectively, and the data volume of Table ∂ is about 9 million. The resource usage is 100 executor-nums * 3 executor-core * 20G executor-memory, but the job is still stuck on the join stage.

Implementation method

Method 1

val tableArr = Array("table_a_name", "table_b_name", "table_c_name", "", "table_d_name", "table_e_name", "table_f_name", "table_g_name", "table_h_name", "table_i_name", "table_j_name") val data_df = tableArr.map(table_name => spark.sql( s""" |SELECT a, b, C | the FROM ${table_name} | "" ". StripMarigin)). The reduce (_) union (_)) val filter_table_tmp = spark. SQL (" "" s | SELECT a | the FROM partial |""".stripMarigin) val result = data_df.join(filter_table_tmp, Seq("a"), "inner")Copy the code

With the above code, by looking at the DAG diagram of the job, it can be found that the program will union the data of the first ten tables and then join the table ∂. Equivalent to more than 40 billion and 9 million directly join; 1. Under the old will spark. SQL. Shuffle. The partition of the adjustment to the total executor program – the core of the 2 ~ 3 times; 2. Check whether the type of the join field is the same in the two tables. But the execution failed; Has been submitted to the org. Apache. Spark. Shuffle. FetchFailedException errors; There is too much data, too many tasks, or too much data executed by one task during shuffle. For those of you who don’t want to solve this problem, check out my previous blog: Spark Trash-vlog — Shuffle when joining.

Method 2[Failure]

The ultimate goal is to split the data and calculate separately. Therefore, we want to join these tables one by one and then join the results together, code:

Val filter_table_tmp = spark. SQL (s "" |" "SELECT a | the FROM partial |" "" stripMarigin). The cache () val table_a_df = spark. SQL (s" "" |SELECT a, b, c |FROM table_a_name |""".stripMarigin) val table_a_result_df = table_a_df.join(filter_table_tmp, Seq("a"), "inner") val table_b_df = spark.sql( s""" |SELECT a, b, c |FROM table_b_name |""".stripMarigin) val table_b_result_df = table_b_df.join(filter_table_tmp, Seq("a"), "inner") ...... val result = table_a_result_df.union(table_b_result_df)......Copy the code

The result is that Spark uses filter_table_tmp to join the table. The result is the same as method 1: merge all data together before join the table. Method 2 fails because of ignorance…

Methods 3

Therefore, in order to achieve the purpose of separate join for each table, each join can only be triggered as a job. In order to facilitate data verification later, I choose to directly write intermediate data to external storage. For those who are not familiar with Spark’s job and task divisions, check out my previous blog Spark’s components [Application, Job, Stage, TaskSet, Task].

Val filter_table_tmp = spark. SQL (s "" |" "SELECT a | the FROM partial |" "" stripMarigin). The cache () val table_a_df = spark. SQL (s" "" |SELECT a, b, c |FROM table_a_name |""".stripMarigin) table_a_df.join(filter_table_tmp, Seq("a"), "inner").createOrReplaceTempView("table_a_result_tmp") spark.sql( s""" |INSERT OVERWRITE TABLE temporary_storage_table_name PARTITION(event = "a") |SELECT * |FROM table_a_result_tmp |""".stripMarigin) ...... val result = spark.sql( s""" |SELECT * |FROM temporary_storage_table_name |""".stripMarigin)Copy the code

Finally, A join generates A job, and the data in table A is generated. Since the program is executed in job order, every time a table is joined, all the resources of the program will be used to execute the data of one table, reducing the amount of data and fulfilling the concept of “small number of times”. The disadvantage is that there are too many jobs and external storage is adopted, which increases the TIME of I/O writing and reading & increases the time of job scheduling. However, compared with the same resources under the data can not run out, these delays are acceptable. This pit end, scatter flowers 🎉 ~