preface

Some supplements based on the knowledge points not mentioned in the last two articles

Spark memory computing framework

3.1 Spark Task Scheduling

  1. The Driver runs the main method of the client to construct SparkContext. DAGScheduler and TaskScheduler are successively constructed within SparkContext
  2. Generate DAG directed acyclic graph according to a series of RDD operation sequence
  3. After DAGScheduler gets DAG directed acyclic graph, stages are divided according to wide dependence. Within each stage there are many tasks that can be run in parallel, which are encapsulated in a taskSet and sent to the taskSet scheduler
  4. After the TaskScheduler gets the taskSet, it iterates through and submits each task to the executor process on the worker node.
  5. All tasks are complete. No further action is required

3.2 spark partition

HashPartitioner is used by default when partitioning RDD data. This function hashes the key and modulates the total number of partitions. If the number of partitions is the same, they will be assigned to the same partition.

If a HashPartitioner has only one function, you can define a custom partitioner, which can be divided into three steps

  1. Inheritance org. Apache. Spark. Partitioner
  2. Override the numPartitions method
  3. Override the getPartition method

3.3 Small Cases (Using Scala)

We want to partition according to the length of the KEY in the RDD, and the length of the same key will enter the same partition.

3.3.1 the main method

Object TestPartitionerMain {def main(args: Array[String]): Unit = {SparkConf val SparkConf: SparkConf = new SparkConf().setAppName("TestPartitionerMain").setMaster("local[2]" SparkContext(sparkConf) sc.setLogLevel("warn") // RDD[String] = sc.parallelize(List("hadoop"," HDFS ","hive","spark","flume","kafka","flink","azkaban")) Encapsulate a tuple val wordLengthRDD: RDD[(String, Int)] = data.map(x=>(x,x.length)) RDD[(String, Int)] = wordLengthRDD. PartitionBy (new MyPartitioner (3)) / / 6, save the result data to document the result. The saveAsTextFile (". / data ") sc. Stop ()}}Copy the code

3.3.2 Customize the MyPartitioner

Class MyPartitioner(num:Int) extends Partitioner{override def numPartitions: Override def getPartition(key: Any): Int ={override def getPartition(key: Any): Int ={ Int = key.toString.length length match { case 4 =>0 case 5 =>1 case 6 =>2 case _ =>0 }Copy the code

}}

3.4 Shared Variables of Spark

3.4.1 Broadcast Variable of Spark

The code for distributed execution in Spark needs to be passed to each Executor’s Task for execution. For some read-only, fixed data (such as data read from DB), the Driver needs to broadcast to various tasks each time, which is inefficient.

Broadcast variables allow variables to be broadcast to individual executors. Each Task on the Executor then obtains variables from the BlockManager of the node where it resides instead of from the Driver to reduce communication costs and memory usage, thus improving efficiency.

3.4.2 Schematic diagram of broadcast variables

3.4.3 Use of broadcast variables

  1. Broadcast creates a broadcast [T] object by calling sparkContext. broadcast on an object of type T. This can be done for any serializable type
  2. Access the value of the object through the value attribute
  3. Variables are sent to each node only once and should be treated as read-only (changing this value does not affect other nodes)

3,4.4 simple code examples

Code example without using broadcast variables

Val conf = new SparkConf().setmaster ("local[2]").setAppName("brocast") val rdd1=sc.textFile("/words.txt") val word="spark" val rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(word)) rdd2.foreach(x=>println(x))Copy the code

Code examples using broadcast variables

val conf = new SparkConf().setMaster("local[2]").setAppName("brocast") val sc=new SparkContext(conf) val Rdd1 =sc.textFile("/words.txt") val word="spark" // Call the broadcast method of sparkContext to broadcast data. Val broadcast = Sc.broadcast (word) // Get the broadcast variable value in executor by calling the broadcast variable value property val rdd2= rdd1.flatmap (_.split(" ")).filter(x=>x.equals(broadCast.value)) rdd2.foreach(x=>println(x))Copy the code

3.4.5 Precautions for using broadcast variables

  1. You cannot broadcast an RDD using the broadcast variable

  2. Broadcast variables can only be defined on the Driver side, not the Executor side

  3. The value of broadcast variables can be changed on the Driver side, but cannot be changed on the Executor side

  4. If the executor uses Driver variables, there are as many copies of driver-side variables as the executor has tasks without using broadcast variables

  5. If the Executor uses Driver variables, there is only one copy of the Driver variables per Executor if the broadcast variables are used

3.4.6 Spark an accumulator

An Accumulator is a distributed variable mechanism provided by Spark. Similar to MapReduce, an Accumulator is a distributed variable mechanism that aggregates changes. A common use is to count events during job execution during debugging. You can use an accumulator for global counting.

use

  1. An accumulator with the initialValue is created by calling the == sparkcontext.accumulator (initialValue) == method in the driver. The return value is org. Apache. Spark. The Accumulator [T] objects, in which T is the initial value of the initialValue type.
  2. Excutor code in the Spark closure (function serialization) can increase the value of the accumulator using the add method of the accumulator.
  3. The driver program can access the value of the accumulator by calling its value property.

3.5 Spark program serialization Problem

3.5.1 Why does the Transformation Operation Need to be serialized

Spark is a distributed execution engine. Its core abstraction is elastic distributed data set (RDD), which represents data distributed on different nodes. Spark calculations are distributed on executors. Therefore, user-developed TRANSFORMATION operations (closures), such as RDD Map, flatMap, and reduceByKey, have the following operations:

  1. Objects in the code are serialized locally to the driver
  2. The object is serialized and transferred to the remote Executor node
  3. The remote Executor node deserializes objects
  4. Finally the remote node executes

Therefore, the object must be serialized during execution and transmitted over the network.

3.5.2 Spark Task serialization Is Abnormal

During spark programming, externally defined variables and functions are used in operators such as Map and foreachPartition, causing Task serialization problems. However, it is unavoidable for Spark operator to use external variables during calculation in many cases. For example, the Filter operator filters according to externally specified conditions and the Map transforms according to the corresponding configuration.

Such as “org. Apache. Spark. SparkException: Task not serializable” this mistake: the reason is that the operator using the external variables, but the variable cannot be serialized. The current class uses the “extends Serializable” declaration to support serialization, but because some fields do not support serialization, it still causes problems when serializing the entire class, and ultimately causes Task unserialization.

3.5.3 Solution to serialization in Spark

  1. If the class object is used in a function, the class is serialized
  2. If a function uses member variables of the class, all member variables of the class must be serialized except for serialization
  3. Use the “@TRANSIENT” annotation for member variables that cannot be serialized to tell the compiler that serialization is not required
  4. You can also separate dependent variables into a small class that supports serialization, which can reduce network traffic and improve efficiency
  5. You can build the creation of the object directly in this function to avoid the need for serialization

3.6 the Spark on Yarn

You can submit the Spark program to YARN to run. In this case, the largest ResourceManager in YARN allocates the computing resources required by the Spark task

Website address: spark.apache.org/docs/2.3.3/… spark-env.sh

Spark on YARN has two modes: yarn-client mode and yarn-cluster mode

3.6.1 yarn – cluster mode

Example for submitting a task in yarn-cluster mode

spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ - the executor - memory 1 g \ - executor - cores 1 \ / opt/spark/examples/jars/spark - examples_2. 11-2.3.3. Jar \ 10Copy the code

Note If an error occurs during the operation, the virtual memory may be insufficient. You can add parameters

<! Pmem-check-enabled </name> <value>false</value> </property> <! Vmem-check-enabled </name> <value>false</value> </property>Copy the code

3.6.2 yarn – client mode

spark-submit –class org.apache.spark.examples.SparkPi –master yarn –deploy-mode client –driver-memory 1g – executor – memory 1 g – executor – cores 1 / opt/spark/examples/jars/spark – examples_2. 11-2.3.3. Jar 10

3.6.3 schematic diagram

Yarn – cluster:

Yarn – the client:

Their differences:

Yarn-cluster mode: The Driver of spark is running in Yarn. The running result cannot be displayed on the client, and the client can delete the application after the application is started. It is better to run the terminal that saves the result to external storage media (such as HDFS, Redis, Mysql) and displays the simple health of the job as YARN.

Yarn-client mode: If the ==Driver of the Spark program runs on the client ==, the running result of the application program is displayed on the client. All applications that are suitable for running and have output results (such as spark-shell)

The biggest difference is the location of the Driver.

Yarn-cluster: The Driver runs in the YARN cluster together with ApplicationMaster.

Yarn-client: The Driver runs on the client that submits tasks. It is independent of the ApplicationMaster process and is often used for testing

3.7 Problems with the collect operator operation

The collect operator is used as an action operation to trigger the running of a task. It collects RDD data and returns it to the Driver in the form of an array

Note:

The default memory size of the Driver is 1 GB, which is set by spark.driver.memory

If the amount of data in an RDD exceeds the default 1 GB memory on the Driver side, the Driver side will overflow the memory when you invoke the collect operation on the RDD. Therefore, the collect operation has certain risks and will not be used in actual code development.

In actual enterprises, this parameter is usually increased, for example, 5G/10G. Run the new SparkConf().set(“spark.driver.memory”,”5G”) command to change this parameter

3.8 Resource Parameter Analysis in the Spark Task

Executor – Memory: The amount of memory required by each executor process, which determines the speed at which data can be processed later

Total-executor-cores: Indicates the total number of CPU cores required to run a task, which determines the granularity of parallel execution of tasks

Later optimization of Spark program can start from these two parameters. No matter which parameter is increased, the efficiency of program operation will be improved to a certain extent. Increasing computing resources is the most direct and effective optimization method. When computing resources are limited, consider other aspects, such as the code level, JVM level, and so on

finally