The author | Chiang specially

Jiang Zao, now an employee of the business logic group of wechat Advertising Center of CDG Business Group social and effect Advertising Department, is responsible for the backstage development of advertising system. He graduated from the College of Software, Tongji University in Shanghai in 2012, and worked in Baidu Phoenix Nest for three years. He joined the wechat advertising Center in 2016.

Introduction:

Spark has become the preferred system in big data computing scenarios such as advertising, reporting and recommendation systems. Due to its high efficiency, ease of use and versatility, Spark has become more and more popular. In the last six months, after I got in touch with Spark and Spark Streaming, I would like to share my experience in using the Spark technology.

This paper introduces spark ecology, principle, basic concept, Spark Streaming principle and practice, spark tuning and environment construction, hoping to be helpful to you.

Spark ecology and operating principles

Characteristics of the Spark

  1. High speed => Spark has a DAG execution engine that iterates data in memory. Official data shows that data can be read from disk more than 10 times faster than Hadoop MapReduce, and data can be read from memory more than 100 times faster.

  2. Applicable to a wide range of scenarios => Big data analysis and statistics, real-time data processing, graph calculation, and machine learning

  3. Ease of use => Simple to write, supports more than 80 kinds of advanced operators, supports multiple languages, supports rich data sources, and can be deployed in multiple clusters

  4. High fault tolerance. Spark introduces the Resilient Distributed Dataset (RDD) abstraction. It is a collection of read-only objects Distributed in a group of nodes. The collection is elastic. They can be reconstructed in terms of “pedigree”, which allows for data-based derivation processes. In addition, error tolerance can be realized through CheckPoint during RDD calculation. There are two methods of CheckPoint: CheckPoint Data and Logging The Updates. Users can control which method is used to achieve error tolerance.

Application scenarios of Spark

Currently, there are the following types of big data processing scenarios:

  1. Complex Batch Data Processing focuses on the ability to process massive Data. As for the tolerable Processing speed, the usual time may be in tens of minutes to several hours.

  2. Interactive queries based on historical data typically take tens of seconds to tens of minutes

  3. Streaming Data Processing based on real-time Data streams, usually in the range of hundreds of milliseconds to seconds

Spark success Stories

At present, big data is mainly applied in advertising, report forms, recommendation systems and other businesses in Internet companies. In terms of advertising business, big data is needed for application analysis, effect analysis and targeted optimization, etc. In terms of recommendation system, big data is needed to optimize relevant ranking, personalized recommendation and hot click analysis. These application scenarios are generally characterized by large amount of computation and high efficiency requirements.

Tencent/Yahoo/Taobao/Youku Tudou

Spark Runtime Architecture

The basic operating architecture of Spark is as follows:

The operation process of Spark and YARN cluster is as follows:

Spark running process:

Spark architecture uses the master-slave model of distributed computing. Master is the node that contains the Master process in the cluster, and Slave is the node that contains the Worker process in the cluster.

  • As the controller of the cluster, the Master is responsible for running the cluster properly.

  • Worker is equivalent to a compute node, receiving commands from the master node and reporting status.

  • Executor is responsible for executing tasks;

  • Client as the Client of a user, the Client is responsible for submitting applications.

  • Driver controls the execution of an application.

After the Spark cluster is deployed, you need to start the Master process on the active node and the Worker process on the standby node to control the cluster. During the execution of a Spark application, the Driver and Worker play two important roles. Drivers are the starting point for the execution of application logic and are responsible for the scheduling of jobs, i.e. the distribution of tasks, while multiple workers manage compute nodes and create executors to process tasks in parallel. In the execution phase, the Driver serializes the Task and the files and jars that the Task depends on and passes them to the corresponding Worker machine, while the Executor processes the tasks for the corresponding data partition.

  1. Excecutor /Task each program has its own, different programs are isolated from each other, and tasks are multi-threaded in parallel

  2. The cluster is transparent to Spark, as long as Spark can obtain related nodes and processes

  3. The Driver communicates with the Executor for collaborative processing

There are three cluster modes:

1.Standalone cluster

2.Mesos, apache mesos

3.Yarn, hadoop yarn

Basic Concepts:

  • Application =>Spark Application, which contains a Driver Program and several Executors

  • SparkContext => Entrance of the Spark application, which schedules computing resources and coordinates executors on Worker nodes

  • Driver Program => Runs the Application’s main() function and creates the SparkContext

  • Executor => is a process running on the Worker node for the Application. This process is responsible for running tasks and storing data in memory or on disk. Each Application applies to its own Executor to handle tasks

  • Cluster Manager => External services that fetch resources on the Cluster (e.g. Standalone, Mesos, Yarn)

  • Worker Node => Any Node in the cluster that can run Application code, running one or more Executor processes

  • Task => Unit of work running on Executor

  • Job => Action submitted by SparkContext. The Action usually corresponds to the Job

  • Stage => Each Job is divided into several groups of tasks. Each group of tasks is called Stage or TaskSet

  • RDD => Resilient Distributed datasets RDD => Resilient distributed datasets Spark is the core module and class of Spark

  • DAGScheduler => Build a stage-based DAG based on the Job and submit the Stage to the TaskScheduler

  • TaskScheduler => Submits the Taskset to the Worker node cluster and returns the result

  • Transformations are a type of Spark API, and the return value of a Transformation is an RDD. All Transformations are lazy. If you just commit a Transformation, you will not perform any calculation

  • Action => is a type of the Spark API. The Action return value is not an RDD, but a Scala collection. The calculation is triggered only when the Action is submitted.

Spark core concept RDD

Core concepts/Actions of Spark

Transformation returns an RDD. It uses the chain-call design pattern, where one RDD is evaluated and transformed into another RDD, which can then be transformed into another RDD. The process is distributed. The Action return value is not an RDD. It is either a generic Scala collection, a value, or nothing, and is eventually returned to the Driver program or written to the file system.

Action is the return value returned to the driver or stored in a file. Action is the RDD to result Transformation. Transformation is the RDD to RDD Transformation.

The RDD is computed only when the action is executed, which is the root of lazy RDD execution.

The core concept of Spark is Jobs/Stage

Job => Parallel calculation of multiple tasks. One action triggers one Job

Stage => A job is divided into multiple tasks. Each task group is called a stage (shuffle)

The core concept of Spark is Shuffle

This section uses reduceByKey as an example to explain the shuffle process.

In the absence of a task file fragmentation under merger shuffle process is as follows: (spark. Shuffle. ConsolidateFiles = false)

Where is the data from fetch stored?

The FileSegment just fetched is stored in the softBuffer buffer, and the processed data is stored in memory + disk. Here we focus on processed data, which can be flexibly set to “memory only” or “memory + disk”. If spark.shuffle.spill = false, use only memory. Since data is not ordered, shuffle Write’s task is simple: Partition data and persist it. Persistence is used to reduce the pressure of memory storage space and fault-tolerance on the other hand.

Shuffle needs to put intermediate results into disk files because the next batch of tasks needs to use memory even though the previous batch of tasks has ended. If you put it all in memory, you’re going to run out of memory. On the other hand, for fault tolerance, to prevent the task from hanging.

The problems are as follows:

  1. Excessive FileSegment is generated. Each ShuffleMapTask generates R (Reducer number) FileSegment, M Shufflemaptasks will generate M * R files. Generally, the M and R values of Spark Jobs are large, so a large number of data files exist on disks.

  2. The buffer occupies large memory space. Each ShuffleMapTask needs to open R buckets, and M ShuffleMapTasks produce MR buckets. Although the corresponding buffer can be reclaimed after a ShuffleMapTask ends, However, the number of cores R simultaneously exists on a worker node (generally, workers can run cores ShuffleMapTask at the same time), and the memory space occupied reaches cores× R × 32 KB. For 8-core 1000 reducer, the occupied memory is 256MB.

To solve the above problem, we can use the file merge function.

Under the task file fragmentation of merger shuffle process is as follows: (the spark. Shuffle. ConsolidateFiles = true)

It is obvious that ShuffleMapTasks executed consecutively on a core can share a single output file, ShuffleFile. ShuffleMapTask can append output data directly to ShuffleBlock I to form ShuffleBlock I ‘. ShuffleMapTask can append output data to ShuffleBlock I to form ShuffleBlock I ‘. Each ShuffleBlock is called FileSegment. The reducer of the next stage only needs to fetch the entire ShuffleFile. Thus, the number of files held by each worker drops to cores× R. FileConsolidation function can spark. Shuffle. ConsolidateFiles = true to open it.

Spark core concept Cache

val rdd1 = ... // Read HDFS data and load it as RDD
rdd1.cache

val rdd2 = rdd1.map(...)
val rdd3 = rdd1.filter(...)

rdd2.take(10).foreach(println)
rdd3.take(10).foreach(println)

rdd1.unpersist
Copy the code

The cache and unpersisit operations are special in that they are neither actions nor transformations. The cache marks the RDD that needs to be cached. The actual cache is only cached after the first call to the relevant action. Unpersisit erases the tag and immediately frees the memory. Only when the action is executed does RDD1 begin to create and perform subsequent RDD transformation calculations.

The cache is also a invoked persist function, with the selected persistence level of MEMORY_ONLY.

Persist supports the following RDD persistence levels:

Issues needing attention:

For Cache or Shuffle serialization, Spark does not support Protobuf message serialization and requires Java serializable objects. This error occurs when an object that does not support Java Serializable is used in serialization.

Spark uses serialization whenever it writes to disks. Except for the shuffle phase and persist, RDD processing is in memory and does not use serialization.

Running principle of Spark Streaming

Spark application uses one Spark application instance to process a batch of historical data at a time. Spark Streaming converts continuously input data into multiple batch fragments and processes them using a batch of Spark application instances.

In principle, what does Spark need to build to turn a traditional Spark batch program into a streaming program?

Four things need to be built:

  1. A static RDD DAG template to represent the processing logic;

  2. A dynamic working controller that shards continuous streaming data into data fragments and copies the new RDD from the template;

  3. An instance of DAG, which processes data fragments;

  4. Receiver generates and imports raw data; Receiver Merges received data into data blocks and stores them in memory or hard disks for subsequent batch RDD consumption.

  5. The guarantee of long running tasks, including the reconstruction of input data after failure, and the retuning of processing tasks after failure.

The detailed principle of streaming can refer to the source code analysis article produced by Guangditong:

Github.com/lw-lin/Cool…

Note the following points for Spark Streaming:

  1. Try to ensure that the data in each Work node does not fall into disks to improve the execution efficiency.

  1. Ensure that each batch of data is processed within the Batch interval to avoid data accumulation.

  1. Use the framework provided by Steven to preprocess the data when receiving and reduce the storage and transmission of unnecessary data. Filter after receiving from TDBank and before dumping, not when task is specific.

Optimize Spark resources

Memory management:

Executor memory is divided into three main parts:

The first is for tasks to execute code we write ourselves, which by default is 20% of Executor’s total memory.

The second block is used when a task is shuffled to the output of a task on the previous stage and aggregated. By default, the task occupies 20% of the Executor memory.

The third block is for RDD persistence, which by default accounts for 60% of Executor’s total memory.

The memory footprint of each task and each executor needs to be analyzed. Each task processes one partiiton of data, and too few fragments result in insufficient memory.

Other resource allocation:

For specific tuning, please refer to the tuning article produced by Meituan-Dianping:

Tech.meituan.com/spark-tunin…

Tech.meituan.com/spark-tunin…

Spark Environment Construction

Spark TDW and TDBank API documentation:

Git.code.oa.com/tdw/tdw-spa…

Other learning materials:

Km.oa.com/group/2430/…