On October 26th,
Bytes to beat technique salon | special big data architecture At bytedance headquarters in Shanghai. We invited Guo Jun, head of Data warehouse architecture of Bytedance, Tao Jiatao, big data R&D engineer of Kyligence, Xu Mingmin, storage engineer of Bytedance, and Bai Chen, senior technical expert of Ariyun, to share with you.

Bytedance’s Core Optimization Practices on Spark SQL by Guo Jun, head of Data Warehouse Architecture at Bytedance.

Team to introduce

The Data Warehouse Architecture team is responsible for the architecture design of the data warehouse domain, supporting data warehouse direction requirements of almost all bytedance product lines (including but not limited to Douyin, Jinri Toutiao, Watermelon Video, Volcano Video), such as Spark SQL/Druid secondary development and optimization.

summary

Today’s share is divided into three parts, the first part is an introduction to SparkSQL architecture, the second part is about bytedance’s optimization practices on SparkSQL engine, the third part is about bytedance’s practice and exploration on the stability improvement and performance optimization of Spark Shuffle.

Introduction to Spark SQL schema

Let’s take a quick look at the Spark SQL architecture. The figure below illustrates the stages that an SQL commit goes through to see where optimizations can be made.



A lot of times, people who do data warehouse modeling tend to write SQL directly rather than using Spark’s DSL. After a SQL is submitted, it is parsed by Parser and transformed into a Unresolved Logical Plan. It focuses on the Logical Plan, which describes what kind of query you want to make. Unresolved means that some information about the query is unknown, such as the Schema and data location of the target table of the query.

The above information is in the Catalog. In a production environment, Hive Metastore provides the Catalog service. The Analyzer converts the Unresolved Logical Plan to a Resolved Logical Plan in combination with the Catalog.

That’s not enough. Different people write different SQL, resulting in different Resolved Logical plans and different execution efficiencies. To ensure efficient execution no matter how SQL is written, the Spark SQL needs to optimize the Resolved Logical Plan by the Optimizer. The Optimizer contains a series of rules that translate the Resolved Logical Plan equivalently and ultimately generate an Optimized Logical Plan. The Optimized Logical Plan is not guaranteed to be globally optimal, but at least it is close to optimal.

The preceding procedure is only related to SQL and queries, but not related to Spark. Therefore, it cannot be directly submitted to Spark for execution. The Query Planner is responsible for converting the Optimized Logical Plan into a Physical Plan that can be executed directly by Spark.

Because the same logical operator can have multiple physical implementation. For example, Join has a variety of implementations, such as ShuffledHashJoin, BroadcastHashJoin, BroadcastNestedLoopJoin, SortMergeJoin and so on. Therefore, Optimized Logical Plan can be converted into multiple Physical plans by Query Planner. How to choose the optimal Physical Plan becomes a very important thing that affects the performance of the final execution. A better way is to build a Cost Model, apply this Model to all candidate Physical plans, and select the Physical Plan with the smallest Cost as the final Selected Physical Plan.

The Physical Plan can be directly converted into an RDD and executed by Spark. We often say that “the plan cannot catch up with the change”. In the process of execution, it may be found that the original plan is not optimal. If the subsequent execution plan can be adjusted according to the statistics of the run time, the overall execution efficiency may be improved. This part of dynamic adjustment is done by Adaptive Execution.

The next section describes some optimizations bytedance has done on Spark SQL, mainly focusing on the logical plan optimization and physical plan optimization described in this section.

Optimize the Spark SQL engine

The Bucket Join improvement

In Spark, there is no Bucket Join operator. A Bucket Join is a SortMergeJoin that does not require Shuffle.

The following figure shows the basic principle of SortMergeJoin. Table 1 and Table 2 represented by the dotted box are two tables that need to be joined according to a certain field. Partition 0 to Partition M in the dotted box are the partitions after the table is converted to the RDD, not the partitions of the table. It is assumed that Table 1 and Table 2 contain M and K partitions respectively after being converted to RDD. To perform Join, you need to run the Shuffle command to ensure that the data with the same Join Key are in the same Partition and the Partition is sorted by Key. Ensure that the number of PARTITIONS in the RDD after Shuffle in Table 1 and Table 2 is the same.

As shown in the following figure, only n tasks need to be started after Shuffle. Each Task can Join the data of the corresponding Partition in Table 1 and Table 2. For example, Task 0 needs to scan partition 0 on the left and right of Shuffle in order to complete the Join.



The advantage of this method is that it can be applied to almost any data set. The disadvantage is that every Join requires a Shuffle of the full data, which has the greatest impact on Spark SQL performance. Avoiding Shuffle can greatly improve Spark SQL performance.

For big data scenarios, data is generally written to multiple queries at a time. If two tables are often joined in the same or similar way, Shuffle costs are incurred each time. Instead, when data is written, distribute data in a way that is conducive to Join, so that Shuffle is not required during Join. As shown in the following figure, data in Table 1 and Table 2 are divided into buckets by the same Key and the number of buckets is n, and the buckets are sorted by the Key. When joining the two tables, you can directly start N tasks to Join them without Shuffle.





Bytedance has made four major improvements to Spark SQL’s BucketJoin.

Improvement 1: Support compatibility with Hive

In the past, Bytedance has migrated a large number of Hive jobs to SparkSQL. The Hive is incompatible with the Spark SQL Bucket table. If the computing engine is directly updated, data written into the Hive Bucket table by Spark SQL cannot be used by downstream Hive jobs as the Bucket table for Bucket Join. As a result, job execution takes a long time. SLA may be affected.

To solve this problem, we enable Spark SQL to support Hive compatibility mode to ensure that the Bucket table written by Spark SQL is the same as that written by Hive. In addition, this table can be treated as a Bucket table by Hive and Spark SQL for Bucket Join without Shuffle. In this way, transparent migration of Hive to Spark SQL is ensured.

The first problem to be solved is that a Hive Bucket usually contains only one file, while a Spark SQL Bucket may contain multiple files. The solution is to dynamically increase a Shuffle that takes the Key of the Bucket as the Key and has the same parallelism as the number of buckets.





The second problem is that Hive 1.x hashes differently than Spark SQL 2.x (Murmur3Hash). The Bucket ID of the same data in Hive is different from that in Spark SQL. Therefore, the data cannot be directly joined. In Hive compatibility mode, we solve this problem by using the same hashing method as Hive for the dynamically added Shuffle.

Improvement 2: Support for multiple Bucket Join

The Spark SQL requires that only the tables with the same Bucket can be joined (necessary but not sufficient). For two tables with very different sizes, such as a dimension table of hundreds of GB and a fact table of tens of TB (single partition), the number of buckets is often different and very different, and Bucket Join cannot be performed by default. Therefore, we support multiple Bucket Join in two ways, that is, when the number of buckets in two Bucket tables is a multiple relationship, Bucket Join is supported.

In the first method, the number of tasks is the same as the number of small table buckets. As shown in the following figure, Table A contains three buckets and Table B contains six buckets. In this case, the data set of bucket 0 and bucket 3 in Table B should Join bucket 0 in Table A. In this case, three tasks can be started. Task 0 joins bucket 0 in Table A with bucket 0 + bucket 3 in Table B. In this case, you need to merge the data of bucket 0 and bucket 3 of Table B again to ensure the order of the collection.


If the number of buckets in Table A and Table B is similar, you can use the preceding method. If the number of buckets in Table B is 10 times that in Bucket A, the above method avoids Shuffle, but the speed of SortMergeJoin containing Shuffle may be slower than that of SortMergeJoin containing Shuffle due to insufficient parallelism. In this case, you can use another method, that is, the number of tasks and the number of large table buckets equal, as shown in the following figure.



In this scenario, the three buckets in Table A can be read multiple times. In the figure above, Bucket Union is performed directly for Table A and Table A (new operator, similar to Union, but retains the Bucket feature), and the result is equivalent to 6 buckets, which is the same as the number of buckets in Table B. This enables Bucket Join.

Improvement 3: Support for BucketJoin degradation

There used to be fewer tables within the company that used buckets, and after we made a series of improvements to buckets, a large number of users wanted to convert the tables to Bucket tables. After conversion, the meta information of the table shows that the table is a Bucket table, but the data in the historical partition is not distributed according to the requirements of the Bucket table. When querying the historical data, the Bucket cannot be identified.

At the same time, the average Bucket size increases rapidly due to the rapid growth of data volume. As a result, the amount of data that needs to be processed by a single Task is too large, and the effect of using Bucket may not be as good as using Shuffle-based Join directly.

To solve this problem, we implemented a Bucket table that supports degradation. The basic idea is that each time you modify the Bucket information, including the above two cases — converting a non-bucket table to a Bucket table, and changing the number of buckets — you record the change date. When deciding which Join mode to use, check whether the queried data contains only partitions after this date for the Bucket table. If yes, it is treated as a Bucket table and Bucket Join is supported. Otherwise, treat it as a normal non-bucket table.

Improvement 4: Support for supersets

For a common table, it may Join another table by User fields, another table by User and App fields, and other tables by User and Item fields. The native Bucket Join of Spark SQL requires that the Join Key Set is the same as the Bucket Key Set of the table. In this scenario, different joins have different Key sets, so Bucket joins cannot be used at the same time. This greatly limits the applicability of Bucket Join scenarios.
To solve this problem, we support Bucket Join in superset scenario. Bucket Join can be performed as long as the Join Key Set contains the Bucket Key Set.

As shown in the figure below, both Table X and Table Y have buckets according to field A. For query, Join Table X and Table Y, and Join Key sets A and B. In this case, since the Bucket ID of the data equal to A is the same in the two tables, the Bucket ID of the data equal to A and B must be the same in the two tables. Therefore, the data distribution meets the requirements of Join and Shuffle is not required. At the same time, Bucket Join also needs to ensure that the two tables are sorted according to Join Key Set A and B. In this case, only intra-partition sorting of Table X and Table Y is needed. Since both sides are already sorted by field A, sorting by A and B is relatively inexpensive.


Materialized column

When Spark SQL processes nested data, the following problems exist:

  • Read a lot of unnecessary data: For the column storage format such as Parquet/ORC, only the required fields can be read and other fields can be skipped directly, thus greatly saving IO. For fields with nested data types, the people field of Map type in the following figure usually only needs to read its subfields, such as people.age. Instead, you need to read out the entire Map people field and extract the people.age field. This introduces a lot of meaningless IO overhead. In our scenario, there are quite a few fields of the Map type, and many of them contain tens to hundreds of keys, which means the IO is magnified tens to hundreds of times.
  • Vectorization read cannot be performed: Vectorial reading can greatly improve performance. But as of now (26 October 2019) Spark does not support vector-read with nested data types. This significantly affects the performance of queries that contain nested data types
  • Filter pushdown is not supported: Spark on October 26, 2019 does not support pushdown of the Filter on the nested type field

  • Double computed: A JSON field exists as a String in Spark SQL and is not strictly a nested data type. However, in practice, it is often used to save multiple fields that are not fixed, and the target subfields are extracted through JSON Path when querying. However, the field extraction of large JSON string is very CPU consuming. For a hotspot table, frequent and repeated extraction of the same subfields is a waste of resources.


    For this problem, the students who do warehouse also think of some solutions. As shown in the figure below, a table named sub_TABLE is created in addition to the table named base_TABLE, and the frequently used subfield people.age is set to an extra field of type Integer. Instead of querying people. Age in base_table, the downstream uses the age field in sub_table instead. In this way, the query on a nested type field is turned into a query on a Primitive type field, while addressing the above problem.


    There are obvious drawbacks to this approach:
    • Maintaining an extra table introduces significant additional storage/computing overhead.
    • The historical data of the new field cannot be queried on the new table (for example, to support the query of the historical data, the history job needs to be rerun, which is too expensive to accept).
    • The maintainer of the table needs to modify the job of inserting data after changing the table structure.
    • The downstream query party needs to modify the query statement, and the promotion cost is large.
    • High operating cost: If frequent subfields change, delete unnecessary independent subfields and add new ones as independent fields. Before deleting this field, ensure that no downstream service uses this field. A new field needs to be notified and promoted to the downstream business side to use the new field.

    To solve all the above problems, we designed and implemented materialized columns. It works like this:


    • Add a Primitive field, such as the AGE field of type Integer, and specify that it is the materialized field of people.age.
    • When data is inserted, data is automatically generated for materialized fields and materialized relationships are saved in Partition parameters. As a result, the insertion job is completely transparent, and the table maintainer does not need to modify the existing job.
    • During the query, all partitions to be queried are checked. If all partitions contain materialized information (the mapping between people. Age and age), select people. It is also compatible with historical data.

    The following figure shows the benefits of using materialized columns on a core table:




    Materialized views

    In THE OLAP field, time-consuming operations such as Group By and Aggregate/Join are performed on certain fixed fields of the same table, resulting in repetitive calculation, resource waste, query performance, and user experience.

    We have implemented the optimization function based on materialized view:




    As shown in the figure above, the query history shows that a large number of queries are group by based on user, and then sum or count is calculated for num. Create a materialized view and gorup by user and avg num (avG is automatically converted to count and sum). When a user performs the SELECT user, sum(num) query on the original table, Spark SQL automatically rewrites the query to the select user, sum_num query on the materialized view.

    Other optimizations on the Spark SQL engine

    The following figure shows the other parts of the optimization we did on Spark SQL:




    Spark Shuffle Improves stability and optimizes performance

    The Spark Shuffle is faulty

    The Shuffle principle, many of you are probably familiar with. For the sake of time, I won’t go into too much detail here, but just a brief introduction to the basic model.



    As shown in the figure, the upstream Shuffle Stage is called the Mapper Stage, and the Task is called the Mapper Stage. The downstream Shuffle Stage is called the Reducer Stage, and tasks in the Stage are called reducers.

    Each Mapper divides its data into a maximum of N parts. N is the number of reducers. Each Reducer needs to use a maximum of M (number of Mapper) mappers to obtain its own data.

    There are two problems with this architecture:

    • Stability problem: Shuffle Write of the Mapper Data is stored on the local disk of the Mapper and has only one copy. If the vm has a disk fault or has a full I/O or CPU load, the Reducer cannot read the data. As a result, the FetchFailedException occurs and the Stage Retry occurs. Stage Retry increases the job execution time, which directly affects the SLA. In addition, the longer the execution time is, the more likely Shuffle data cannot be read, which in turn causes more stages to Retry. This loop can cause large jobs to fail to execute successfully.


    • Performance issues: A large number of data is read from each Mapper, and different parts are read randomly. If the Shuffle output of a Mapper is 512MB and there are 100,000 reducers, the data read from each Reducer on average is 5.24KB (512MB / 100000). In addition, data is read from different Reducer concurrently. For Mapper output files, there are a lot of random reads. The random I/O performance of HDDS is much lower than that of sequential I/O. As a result, the Reducer reads Shuffle data very slowly. According to Metrics, the Reducer Shuffle Read Blocked takes a long Time, or even more than half of the Reducer execution Time.




    The stability of THE HDFS Shuffle is improved

    It is observed that the biggest factor that causes the Shuffle failure is not a hardware problem, such as a disk failure, but a full CPU and full disk I/O.



    As shown in the figure, the CPU usage is close to 100%. As a result, the Spark External Shuffle Service in the Node Manager on the Mapper side cannot provide the Shuffle Service in a timely manner.

    As shown in the following figure, Data nodes occupy 84% of the I/O resources of the whole machine. As a result, reading Shuffle Data is very slow and the Reducer cannot read Data within the timeout period, resulting in a FetchFailedException.



    The main problem is that the Shuffle Write data on the Mapper is only stored locally. If the node is faulty, all the Shuffle Write data on the node cannot be read by the Reducer. A general approach to this problem is to ensure availability through multiple copies.

    Initially, a simple solution is to write the final data files and index files on the Mapper side to the HDFS instead of the local disk. Instead of reading Shuffle data from the External Shuffle Service on the Mapper side, the Reducer directly obtains data from the HDFS. See the following figure.




    After a quick implementation of this solution, we did a few simple tests. The results show that:

    • If there are not too many mappers and reducers, the read/write performance of Shuffle is the same as that of the original solution.
    • When there are too many mappers and reducers, Shuffle read becomes very slow.





      In the above experiment process, HDFS sent an alarm message. As shown in the following figure, the peak QPS of HDFS Name Node Proxy reaches 600,000. (Note: Bytedance developed its own Node Name Proxy and implemented caching in the Proxy layer, so read QPS can support this magnitude).




      The reason is that data files and index files need to be read from 10000 Mappers in total. The HDFS must be read 200 million times (10000 x 1000 x 2).

      If it’s just a single point of performance for Name Node, there are some simple solutions. For example, the Spark Driver saves the Block Location of all mappers, and the Driver broadcasts the information to all executors. Each Reducer can obtain the Block Location directly from the Executor and read Data directly from the Data Node without connecting to the Name Node. However, in view of the threading model of Data Node, this scheme will have a great impact on Data Node.

      Finally, we chose a relatively simple and feasible scheme, as shown in the figure below.




      The Shuffle output data of the Mapper is still written to the local disk in the original scheme, and then uploaded to the HDFS. The Reducer still reads Shuffle data using the External Shuffle Service on the Mapper side based on the original plan. If this fails, the HDFS is read from the HDFS. This solution greatly reduces the frequency of HDFS access.

      The program has been online for nearly a year:

      • More than 57% of Spark Shuffle data must be covered.
      • The overall Spark job performance is improved by 14%.
      • Performance for day scale operations increased by 18%.
      • Hourly performance improved by 12%.



      This scheme aims to improve the stability of Spark Shuffle and thus improve the stability of the operation, but in the end, it does not use indicators such as variance to measure the improvement of stability. The reason is that the cluster load varies from day to day and the overall variance is large. After the Shuffle stability is improved, the Stage Retry is significantly reduced, reducing the overall job execution time and improving the Shuffle performance. Finally, the performance improvement was compared by comparing the total job execution time before and after using the scheme to measure the effect of the scheme.

      Shuffle performance optimization practice and exploration

      As described above, Shuffle performance problems are caused by the fact that Shuffle Write is completed by the Mapper, and the Reducer needs to read data from all mappers. This model, we call it a Mapper-centric Shuffle. Here’s the problem:

      • There are M sequential write I/OS on the Mapper.
      • The Mapper side has M * N * 2 random read IO (the biggest performance bottleneck).
      • The External Shuffle Service on the Mapper side must reside on the same machine as the Mapper. Therefore, storage and computing cannot be separated effectively, and the Shuffle Service cannot be independently expanded.
      To solve the above problems, we propose the Shuffle solution that centers on the Reducer and divides storage and computing, as shown in the following figure.





      The principle of this solution is that the Mapper directly writes data belonging to different Reducer to different Shuffle services. In the preceding figure, there are two mappers, five Reducers, and five Shuffle services. All mapPers send remote streams of data belonging to Reducer 0 to Shuffle Service 0, which writes data to disks in sequence. Reducer 0 only needs to read all data sequentially from Shuffle Service 0. Instead, Reducer 0 does not need to fetch data from M mappers. The advantages of this scheme are:
      • Change M * N * 2 random I/OS to N sequential I/OS.
      • The Shuffle Service can be deployed independently of the Mapper or Reducer to achieve independent expansion and separation of storage and computing.
      • The Shuffle Service directly stores data to high-availability storage such as HDFS, thereby ensuring Shuffle stability.
      That’s all for my share. Thank you.

      QA highlights

      – Question: If the materialized column is added, does the historical data need to be modified?

      Answer: There are too many historical data to modify.

      – Question: What if a user request contains both new and historical data?

      Answer: In general, users modify data in units of partitions. So we store the materialized column information on the Partition Parameter. If the user’s query contains both the new Partition and the historical Partition, we will SQL Rewrite the materialized columns on the new Partition, but the historical Partition will not Rewrite. Then the new and old partitions are Union, so as to make full use of the advantages of physical and chemical columns on the premise of ensuring the correctness of data.

      – Q: Hello, you have made some valuable optimizations for your user’s scenario. Materialized columns and materialized views need to be set according to the user’s query Pattern. Do you currently analyze these queries manually, or do you have some mechanism that automatically analyzes and optimizes them?

      Answer: At present, we are mainly using some audit information to assist manual analysis. At the same time, we are also doing materialized column and materialized view recommendation service, and finally achieve intelligent construction materialized column and materialized view.

      – Question: Can the Spark Shuffle stability improvement scheme based on HDFS be used to asynchronously upload Shuffle data to HDFS?

      Answer: It’s a good idea, and we’ve considered it before, but we didn’t do it for a few reasons. First, the Shuffle output data of a single Mapper is generally small. Uploading data to HDFS takes less than 2 seconds. This time cost can be ignored. Second, External Shuffle Service and Dynamic Allocation are widely used. After Mapper is executed, Executor may be recycled. If asynchronous uploading is required, other components must be relied on, which increases complexity and leads to low ROI.

      More wonderful share

      Shanghai salon review | bytes to beat level how to optimize all nodes HDFS platform

      Shanghai salon review | Apache Kylin principle is introduced to share with the new architecture (Kylin On Parquet)

      Review | Shanghai salon Redis high-speed cached in the application of the large data scenarios

      Bytedance Technology salon

      Bytedance Technology Salon is a technical exchange event initiated by Bytedance Technology Institute and co-hosted by Bytedance Technology Institute and Nuggets Technology Community.

      Bytedance Technology Salon invites technical experts from Bytedance and Internet companies in the industry to share hot technology topics and front-line practical experience, covering architecture, big data, front end, test, operation and maintenance, algorithm, system and other technical fields.

      Bytedance Technology Salon aims to provide an open and free exchange and learning platform for talents in the field of technology to help them learn and grow and continue to advance.

      Welcome to “Bytedance Technical Team”