Parameter Setting Method

When submitting a Spark task, you can set parameters in the following ways:

The environment variable

Using the spark-env.sh configuration file, for example

Export JAVA_HOME=/usr/local/ JDK export SCALA_HOME=/usr/local/scala export SPARK_MASTER_IP=127.0.0.1 export SPARK_MASTER_WEBUI_PORT=8088 export SPARK_WORKER_WEBUI_PORT=8099 export SPARK_WORKER_CORES=4 export SPARK_WORKER_MEMORY=8gCopy the code

This parameter setting is global and does not apply to all tasks, so it can be used as the default and set separately for specific tasks.

Project code

Set in SaprkConf, such as

val conf=new SparkConf().setMaster("local").setAppName("Myapplication").set("spark.executor.memory","1g")
val sc=new SparkContext(conf)
Copy the code

Or through system.properties, such as

System.setProperty("spark.executor.memory","14g")
System.setProperty("spark.worker.memory","16g")
val conf=new SparkConf().setAppName("Simple Application")
val sc=new SparkContext(conf)
Copy the code

Submit a script

Set when the task is submitted, such as

spark-submit \
--spark.executor.memory 14g \
--spark.worker.memory 16g \
--name SimpleTest \
-jars ./lib/*.jar \
--classpath com.example.test \
sparkTest.jar
Copy the code

If there is no specific requirement, you are advised to set it in the task submission script, which is more flexible.

Parameter names format Parameters that
–master MASTER_URL For example, spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
–deploy-mode DEPLOY_MODE Client or master, the default is Client
–class CLASS_NAME The main class of the application
–name NAME Name of the application
–jars JARS Local JAR packages, separated by commas, in the driver and Executor classpath
–packages A comma-separated “groupId:artifactId: version” list of jar packages in the driver and Executor classpath
–exclude-packages A comma-separated list of “groupId:artifactId”
–repositories A comma-separated remote repository
–py-files PY_FILES Comma-separated “. Zip “, “. Egg “, or “. Py “files are placed under PYTHONPATH of the Python app
–files FILES Comma-separated files that are placed under each Executor’s working directory
–conf PROP=VALUE Fixed Spark configuration properties. The default is conf/spark-defaults.conf
–properties-file FILE Load files with additional properties
–driver-memory MEM Driver memory: 1 GB by default
–driver-java-options Additional Java options passed to the driver
–driver-library-path Additional library paths passed to the driver
–driver-class-path Additional classpath passed to driver
–executor-memory MEM Memory per Executor, the default is 1G
–proxy-user NAME Simulate the user who submits the application
–driver-cores NUM Number of Driver cores. The default value is 1. This parameter is used only in the standalone cluster deploy mode
–supervise If the Driver fails, restart the Driver. Use it under mesos or standalone
–verbose Displaying Debugging Information
–total-executor-cores NUM The total number of cores for all executors. Use only under mesos or standalone
–executor-core NUM Number of cores per executor. Use under yarn or standalone
–driver-cores NUM Number of Driver cores. The default value is 1. This parameter is used in YARN cluster mode
–queue QUEUE_NAME Queue name. Use this command in yarn
–num-executors NUM Number of executors started. The default value is 2. Use this command in yarn

Performance Viewing Mode

You can view Spark tasks in the following ways:

1) the Web UI

2) Logs of the Driver console

3) Logs in the logs folder

4) Log in the work folder

5) Profiler tools, for example, some JVM Profiler tools, such as Yourkit, Jconsole or JMap, JStack commands. A more comprehensive approach can be found through cluster Profiler tools such as Ganglia, Ambaria, etc.

optimized

The Spark task should be optimized from the following aspects:

Merge subpartitions

applicable

This parameter is applicable to the case where a large amount of data is filtered out after the filter operation.

When using Spark, users often use the filter operator to filter data. Frequent filtering or filtering out a large amount of data can cause problems, resulting in the creation of a large number of small partitions (each partition has a small amount of data). A task is assigned to each data partition in Spark. If there are too many tasks, each task processes only a small amount of data, resulting in high thread switching overhead. Many tasks are waiting to be executed and the parallelicity is low.

The solution

Use repartion/coalesce to recombine partitions

val rdd=sc.textFile("1.log").filter(line=>lines.contains("error")).filter(line=>line.contains(info)).repartition(10).collect (a);Copy the code

Increase parallelism

applicable

Read files in parallel and perform Map, Filter, and Reduce operations.

The solution

  1. Add parallelism parameters when calling functions, such as reduceByKey(func,numPartitions)

  2. Configure the spark. Default. Parallelism parameters

Officially, it is recommended to allocate 2 to 3 tasks per CPU, but the amount of parallelism needs to be tradeoff in actual production;

If the degree of parallelism is too high and the number of tasks is too large, a large amount of task startup and switching overhead will be generated.

If the degree of parallelism is too low and the number of tasks is too small, the parallel computing capability of the cluster cannot be utilized, and the task execution is too slow. In addition, excessive memory combine data may occupy the memory, resulting in an exception of out ofmemory.

DAG optimization

The solution

  1. The same Stage contains as many operators as possible to reduce shuffle operations and the cost of task starting and switching

  2. Reuse cached data, cache and persist data

Memory optimization

The solution

The JVM tuning

Minimize the use of small data objects.

Every Java object has an Object header. This object header is about 16 bytes long and contains information such as a pointer to the object’s class. This is uneconomical for objects with very little data. For example, if an object has only one Int property, the header information will take up more space than the object’s data space.

A String in Java takes up 40 bytes. String memory stores the information about the real String in a CHAR array. It also stores other information, such as the length of the String, and takes up 2 bytes for each character in UTF-16 encoding. A 10-character string takes up more than 60 bytes of memory.

Evaluate the memory footprint

The best way to calculate the size of the data in the cluster memory is to create an RDD, read the data, load the data into the cache, and view the SparkContext log in the driver console. The log information shows how much space is occupied by each partition (data is written to disk when memory is insufficient), and users can then estimate the space consumed by the entire RDD based on the total number of partitions. For example, the following log information.

INFO BlockManagerMasterActor: Added RDD_0-1 in memory on MBK. Local: 50311(size: 717.5KB,free: 332.3MB)Copy the code

This means that partition1 of RDD0 consumes 717.5KB of memory space.

Adjust the data structure

Reduce the consumption of some Java-specific information other than raw data, such as Pointers in chained structures, metadata generated by wrapping data, etc.

It is best to use array types and basic data types when designing and selecting data structures, minimizing the use of chained Java collections or Scala collection types.

Reduce object nesting. For example, using a large number of small, large objects and collection data structures containing Pointers creates a large amount of pointer and object header metadata overhead.

Consider using numeric ids or enumerations rather than strings as data types for key keys. The metadata and character encoding of the string itself takes up too much space.

Serialize storage of the RDD

In the program, you can configure the data storage mode of the RDD by setting the StorageLevels enumeration type. Users who want to cache data in memory are officially recommended to use Kyro’s serialization library for serialization because Kyro’s serialized objects take up less space and perform better than Java’s standard serialization library.

The GC tuning

When the JVM needs to replace and reclaim the space occupied by the old objects to make room for the new ones, according to the JVM garbage collection algorithm, the JVM iterates through all the Java objects and finds objects that are no longer used and then reclaims them.

Design data structures that create fewer objects, such as arrays rather than linkedLists, to reduce garbage collection overhead.

An important configuration parameter for the GC is the amount of space memory gives the RDD to use for caching.

By default, Spark uses 60% of the configured Executor memory (spark.executor.memory) to cache the RDD. This means that 40% of the remaining memory space is available for tasks to cache newly created objects during execution.

In cases where the user’s tasks are slow and the JVM is garbage collecting frequently or has an out of memory exception, you can adjust this percentage parameter to 50%. This percentage parameters can be configured spark – env. Sh the variables in the spark. Storage. MemoryFraction = 0.5 is configured.

At the same time, the combination of serialized cache storage objects to reduce memory footprint will more effectively alleviate the garbage collection problem.

OOM optimization

To avoid memory overflows, you can optimize in the following ways:

  1. Check the program to see if there are infinite loops or unnecessarily creating a large number of objects;

  2. Increase the size of the Xms (initial heap size) and Xmx (maximum heap size) parameters in the Java virtual machine, e.g. SetJAVA_OPTS = -xMS256m -xmx1024m;

  3. Optimize the data structure and related configuration parameters

Temporary directory space optimization

The configuration parameter spark.local.dir can be used to configure spark’s temporary directory on the disk. The default directory is/TMP. During the Shuffle of Spark, the intermediate result will be written to Spark’s temporary directory on the disk. Alternatively, if the memory cannot store the RDD completely, the data that cannot be stored will be written to the configured temporary directory on the disk.

Setting this temporary directory too small will cause the No space Left onDevice exception. Also can configure multiple disk block spark. Local. Dir = / mn1 / spark, / mnt2 / SPAR, / mnt3 / spark to extend spark disk temporary directory, let more and more data can be written to disk, speed up the I/O.

Network transmission optimization

Big task distribution

During the distribution of a task, metadata information about the task is serialized, along with the jars and files that the task requires.

Tasks are distributed through messaging between Actor models in the AKKA library. Spark adopts the functional style of Scala, and variable references of transfer functions are passed in closure mode. Therefore, when data to be transferred is distributed through tasks, the overall execution speed is slowed down.

The spark.akka.frameSize parameter (default buffer size is 10MB) can reduce the problem of akka buffer overflow caused by excessive tasks. However, this method does not solve the essential problem.

Using broadcast variables

Broadcast is used to share read-only variables that are used by each task during Spark calculation. Only one copy of the Broadcast variable is saved on each computing machine, and each task does not pass one copy. In this way, space is greatly saved, and transmission time is reduced.

The collect result is too large

SparkContext is used to execute each partition into an array. After returning to the primary node, the array of all partitions is merged into one array. In this case, if the amount of data being collected is too large, a problem can arise, as a large number of slave nodes write data back to the same node, slowing down the overall running time, or potentially causing a memory overflow.

The solution

When the collected end result data is too large, the data can be stored in the distributed HDFS or other distributed persistence layer. The distributed storage of data can reduce the I/O cost of single-node data and the memory storage pressure of single-node data.

Otherwise, if the data size is not too large but exceeds the size of the Buffer transferred by AKKA, you need to increase the Buffer of the AKKA Actor by setting spark.akka.frameSize (the default size is 10MB).

Serialization optimization

Spark provides two serialization libraries and two serialization methods: using the Java standard serialization library and using the Kyro library for serialization.

The Java standard serialization library is compatible, but it is large and slow. Kyro library is slightly less compatible, but it is small and fast. Therefore, it is recommended to use Kyro for serialization when Kyro is available.

Can spark the serializer = “org. Apache. Spark. Serializer. KryoSerializer” to configure whether to use Kyro serialization, This configuration parameter determines the type of serializer used by Shuffle for network transfers and when memory cannot accommodate RDD to write partitions to disk.

If objects take up the space is very big, need to increase the Kryo buffer capacity, you need to increase the configuration items spark. Kryoserializer. Buffer. MB of numerical value, the default is 2 MB, but the parameter value should be large enough, in order to accommodate the largest serialized objects transmission.

Compression optimization

Compression of RDD or Broadcast data in Spark is a means to improve data throughput and performance. By compressing data, the disk storage space can be greatly reduced. At the same time, the communication overhead between disks, I/O, and network transmission of compressed files can be reduced. Of course compression and decompression also bring additional CPU overhead, but you can save more I/O and use less memory overhead.

Compressing data can minimize disk space and network I/O overhead for files, but compressing and decompressing data always increases CPU overhead, so it is best to use data compression for I/ O-intensive jobs that have CPU resources to spare, or for systems that are not rich in disk space.

Currently Spark supports LZF and Snappy decompression methods. Snappy provides a higher compression speed and LZF a higher compression ratio. Users can select a compression method based on specific requirements.

Outside the chain picture archiving failure, the source station might be hotlinking prevention mechanism, proposed to directly upload picture preserved uht7dhl (img – 3-1609390347651) (695 fd3180a7d470fa6d0599f0d472776)]

Batch optimization

applicable

Calling an external resource, such as a database connection, can turn a single record write into a batch write of the database, which is written once for each partition. In this way, the batch write optimization of the database can be used to reduce the overhead and reduce the pressure on the database.

rdd.foreach(line=>conn=getConnection()
conn.write(line)
conn.close)
Copy the code

The optimized

rdd.mapPartition(line=>conn=getConnection()
for(item<-items)
conn.write(item)
conn.close
)
Copy the code

Use reduceByKey instead of Reduce

Reduce is an action operation that collects the results of each task on a node. ReduceByKey, on the other hand, is distributed, so there is no reduce bottleneck.

Data skew optimization

This is also the focus of optimization, which is summarized in a separate chapter.