preface

The last article when blowing water, from this began to enter the topic.

2. Spark’s Memory Computing Framework (key 😶)

Resilient Distributed Dataset (RDD) Resilient Distributed Dataset (RDD) is the most basic data abstraction in Spark. It represents an immutable and partitioned collection whose elements can be computed in parallel.

Dataset: a Dataset that stores a lot of data.

Distributed: Internal elements are Distributed for storage, facilitating Distributed computing in the later stage.

Resilient: Resilient. RDD data can be saved in the memory or disk.

Copy the code

In code, the result of each method is an RDD, such as the Scala code above, and the result of the next RDD depends on the previous RDD

I know to present everybody did not whole understand, have no matter, continue to look to be able to understand 😏

2.1 Five features of RDD


The following is an explanation of RDD in the source code, which I used in the points
I think(Accept refutation 👌) more reasonable statement to explain

2.1.1 A list of partitions

A list of partitions that make up the data for the RDD.

This indicates that an RDD has many partitions, and each partition contains some data of the RDD. In Spark, tasks run in the form of task threads, and each partition corresponds to a task thread.

The user can specify the number of partitions in the RDD when creating the RDD; if not, the default value is used. (For example, the number of RDD partitions generated by reading HDFS data files is equal to the number of blocks.)

For example, if I want to create a wordCount, the size of this text is 300M, then according to our HDFS routine, every 128M is a block, then the 300M file is 3 blocks. Then our RDD will determine the number of partitions in the RDD based on the number of blocks in the file. In this case, the number of partitions in the RDD is 3, but if the file itself is less than 128MB, the RDD will default to 2 partitions

2.1.2 A function for computing each split

The calculation function of each partition is an RDD – The CALCULATION of the RDD in Spark is based on partition. Each RDD implements the compute function to achieve this purpose.

A list of dependencies on other RDDs

The spark task’s fault tolerance mechanism is based on the fact that one RDD depends on multiple other RDD’s.

“Optionally, a Partitioner for key-value RDDs” (e.g., to say that the RDD is hash-partitioned)

Partitioning functions are available only for key-value RDD. Partitioning functions actually throw the computed results into different partitions.

Two types of partitioning functions are currently implemented in Spark, a HashPartitioner based on hash and a RangePartitioner based on range. A Partitioner will only have a shuffle if a key-value RDD is generated. The value of the Parititioner for a non-key-value RDD is None.

HashPartitioner is a HashPartitioner that defines how much to throw one partition, and how much to throw another partition.

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

The location of the calculation task is the location of each Partition (customizable).

This involves the locality of the data, and the location of the data block is optimal. To put it simply, it means that we do calculations where there is data.

During spark task scheduling, nodes with data on them are considered to start computing tasks, reducing data transmission over networks and improving computing efficiency.

2.2 Analyzing the five attributes of RDD based on wordCount

The requirement is to have a file of 300M on HDFS, use Spark to count the words of the file, and finally save the result data to HDFS. The code is as follows. Note that we use Scala instead of Java code

sc.textFile("/words.txt"// Read the data file

    .flatMap(_.split("")) // Split each line to get all the words

        .map((_,1)) // Each word counts as 1

            .reduceByKey(_+_) // add 1 to the same key

                .saveAsTextFile("/out"// Save output to /out

Copy the code

Since I don’t have the environment deployed locally, I will not take screenshots of the following process, but will explain the steps and results

2.2.1 RDD1: sc. TextFile (“/words. TXT “)

Let’s run spark-shell, and then we can run the code step by step, so we can see


TextFile (“/words.txt”) is now ready to execute scala code

Since RDD is an abstract class, sc.textFile(“/words.txt”) is received by its subclass MapPartitionsRDD

sc.textFile("/words.txt").partitions

Copy the code

If you look at the partition, you will get an array with a length of 3. (300M files will have 3 blocks, and the number of partitions in the RDD is determined by the blocks. Note that the number of partitions in the RDD may not all be on different servers. However, if the number of partitions in the RDD is 1, the number of partitions will default to 2.)

sc.textFile("/words.txt").partitions.length

Copy the code

The length of the viewing array must be 3

In this case, the results of RDD2,RDD3, and RDD1 are received by MapPartitionsRDD. The difference is that the RDD obtained by map is of key-value type

2.2.2 RDD4: sc textFile (“/words. TXT “). FlatMap (.split(” “)).map((,1)).reduceByKey(+)

The receiving type of RDD4 is ShuffleRDD, because the results need to be grouped by key, shuffle will be generated. Shuffle For example, in wordCount, there are only 3 keys in words. TXT, respectively “zookeeper”,”kafka”, and “Spark “, so I will shuffle this rule

At this point, saveAsTextFile will get 3 files. It is not difficult to find that RDD will have several files if there are several partitions. The five features of each RDD have been written on the right side of the figure.

2.3 RDD Creation Method

2.3.1 Build from an existing Scala collection

val rdd1=sc.parallelize(List(1.2.3.4.5))

val rdd2=sc.parallelize(Array("zookeeper"."kafka"."spark"))

val rdd3=sc.makeRDD(List(1.2.3.4))

Copy the code

2.3.2 Load external data sources to build

val rdd1=sc.textFile("/words.txt")

Copy the code

2.3.3 Converting an existing RDD to generate a new RDD

val rdd2=rdd1.flatMap(_.split(""))

val rdd3=rdd2.map((_,1))

Copy the code

2.4 Operator classification of RDD

2.4.1 Transformation

A new RDD is generated based on an existing RDD transformation, it is lazily loaded, and it does not execute immediately

For example, the map, flatMap, and reduceByKey used in wordCount

2.4.2 Action

It actually triggers the task to run. Returns the RDD calculation result data to the Driver or saves the result data to an external storage medium

For example, collect, saveAsTextFile, etc. used in wordCount

2.5 Common OPERATORS of RDD (with code description below)

2.5.1 transformation operator

conversion meaning
map(func) Returns a new RDD consisting of each input element transformed by the func function
filter(func) Returns a new RDD consisting of input elements evaluated by the func function that return true
flatMap(func) Similar to a map, but each input element can be mapped to zero or multiple output elements (so func should return a sequence, not a single element)
mapPartitions(func) Iterator[T] => Iterator[U]; Iterator[U] => Iterator[U]
mapPartitionsWithIndex(func) Similar to mapPartitions, but func takes an integer argument to represent the index value of partitions, so when running on RDD of type T, func must be of type (Int, Interator[T]) => Iterator[U].
union(otherDataset) The union of the source RDD and the parameter RDD returns a new RDD
intersection(otherDataset) The intersection of the source RDD and the parameter RDD returns a new RDD
distinct([numTasks])) A new RDD is returned after the source RDD is de-duplicated
groupByKey([numTasks]) Called on an RDD of (K,V), returns an RDD of (K, Iterator[V])
reduceByKey(func, [numTasks]) Returns an RDD of (K,V). Aggregates the values of the same key using the specified Reduce function. Similar to groupByKey, the number of Reduce jobs can be set using the second optional parameter
sortByKey([ascending], [numTasks]) Called on an RDD of (K,V), K must implement the Ordered interface to return a RDD of (K,V) Ordered by key
sortBy(func,[ascending], [numTasks]) Similar to sortByKey, but more flexible
join(otherDataset, [numTasks]) Called on RDD of type (K,V) and (K,W), returns a RDD of (K,(V,W) with all elements of the same key together
cogroup(otherDataset, [numTasks]) Called on RDD of type (K,V) and (K,W), returns an RDD of type (K,(Iterable,Iterable)
coalesce(numPartitions) Reduces the number of partitions in the RDD to the specified value.
repartition(numPartitions) Repartition the RDD
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD and sort it by record key within each partition

2.5.2 the action of operator

action meaning
reduce(func) Reduce passes the first two elements in the RDD to the input function to generate a new return value, which forms two elements with the next element (the third element) in the RDD and is passed to the input function until there is only one value.
collect() In a driver, all elements of a data set are returned as an array
count() Returns the number of elements of RDD
first() Return the first element of the RDD (similar to take(1))
take(n) Returns an array consisting of the first n elements of the dataset
takeOrdered(n, [ordering]) Returns the first n elements of natural or custom order
saveAsTextFile(path) Save the data set elements as textFiles to the HDFS file system or other supported file systems. For each element, Spark calls the toString method to replace it with text in the file
saveAsSequenceFile(path) You can save elements in a data set in Hadoop Sequencefile format to a specified directory to enable the HDFS or other file systems supported by Hadoop.
saveAsObjectFile(path) Saves the elements of a data set to a specified directory as Java serialization
countByKey() For RDD of type (K,V), return a map of (K,Int), representing the number of elements corresponding to each key.
foreach(func) On each element of the dataset, run the function func
foreachPartition(func) On each partition of the dataset, run the function func

2.6 Code description of common operators

Well, I forgot I don’t have a local environment

Just to remind you, the transformation operator does not obtain the result in the real sense. You need to run collect() to start the calculation

2.6.1 Notice: Repartition and coalesce

val rdd1 = sc.parallelize(1 to 10.3)

// Prints the number of partitions for RDD1

rdd1.partitions.size



// Use repartition to change the number of rDD1 partitions

// Reduce partitions

rdd1.repartition(2).partitions.size



// Add a partition

rdd1.repartition(4).partitions.size



// Use coalesce to change the number of RDD1 partitions

// Reduce partitions

rdd1.coalesce(2).partitions.size

Copy the code

Repartition: Repartition. Shuffle is used to process small files

Coalesce: coalesce coalesce or coalesce reduction. Shuffle is not used by default

By default, the number of coalesce partitions cannot be expanded. Unless true is added, or repartition is used.

Applicable scenarios:

  1. If shuffle is required, repartition is used
  2. Shuffle is not required. Only partition coalesce is used
  3. Repartition is used to expand a partition.

2.6.2 Note: Map, mapPartitions and mapPartitionsWithIndex

Map: Used to traverse the RDD, apply function F to each element, and return a new RDD(transformation operator).

MapPartitions: used to iterate over each partition in the RDD and return to generate a new RDD (transformation operator).

Conclusion: If additional objects need to be created frequently during mapping, mapPartitions are more efficient than maps. For example, all data in the RDD is written to the database via a JDBC connection. If you use the map function, you may need to create a connection for each element, which is expensive. If mapPartitions are used, only one connection needs to be established for each partition.

2.6.3 Notice: Foreach and foreachPartition

Foreach: Used to traverse the RDD, applying f to each element with no return value (action operator).

ForeachPartition: used to traverse each partition in the RDD. No return value (action operator).

Summary: mapPartitions or foreachPartition operators are generally more efficient than Map and foreach, and are recommended.

So we can use foreachPartition operator to implement

2.7 Dependence of RDD

There are two different types of relationships between RDD and its dependent parent RDD: narrow dependency and wide Dependency

2.7.1 narrow rely on

Narrow dependencies mean that the Partition of each parent RDD can be used by at most one Partition of the RDD, such as map, flatMap, filter, and Union. All narrow dependencies do not generate shuffle

2.7.2 wide rely on

Dependence refers to the wide more child RDD Partition will rely on the same parent RDD Partition, such as reduceByKey/sortByKey/groupBy/groupByKey/join, and so on. All wide dependencies produce shuffle

As can be seen from the figure above, join operations are divided into wide dependence and narrow dependence. If RDD has the same partitioner, shuffle will not be caused. This join is narrow dependence, and on the contrary, it is wide dependence

2.8 lineage

Lineage translates to bloodline. It’s the same picture I had before.


Lineage of the RDD records the metadata information and conversion behavior of the RDD, and Lineage stores the dependency relationship of the RDD. When data of some RDD partitions is lost, Lineage can recalculate and recover the lost data partitions based on the information.

However, it should be noted that RDD only supports coarse-grained transformation (that is, only a single operation performed on a single block is recorded). For example, if I am no. 0 of RDD4, I need to retrieve all the data of RDD3 and re-reduceByKey once, so as to restore the result. This wide dependence requires shuffle operation. The cost of recovery will be much higher

It is worth mentioning again that we do not need human intervention to recover partitioned data, and the program itself can help us recover according to the lineage of the RDD

2.9 Caching mechanism of RDD

You can cache the data of an RDD, and obtain the RDD result data directly from the cache for subsequent jobs, avoiding double calculation. Caching is to speed up subsequent access operations to that data.

For example, in the figure above, I only need to make a cache of RDD2 results. If it is used, it will be very convenient to recover

2.9.1 How Do I Set cache for the RDD

RDD can cache the results of previous calculations through the persist or cache methods. Note that the RDD is not cached immediately when the two methods are called. Instead, the RDD will be cached in the memory of the compute node when the action is triggered and reused later.

We can look at the source summary of RDD to find look look, these two methods are put together


A look at the source code shows that the cache also ends up calling persist, which is the default cache level
MEMORY_ONLYOnly one copy is stored in memory. Spark has multiple storage levels, which are defined in Object StorageLevel

StorageLevel is object in Scala (note that I use lowercase here, object is a singleton, not Object in Java), and there are a number of different storage levels that I won’t expand here, which are not difficult to understand in English

Cache differs from persist: cache: Persist: Data can be cached in memory or on disk with various cache levels defined in the StorageLevel object.

2.9.2 Cache Usage Timing

When RDD2 is used to perform operator operations to obtain RDD3 for the first time, it starts from RDD1, reads files in HDFS, performs operator operations on RDD1 to obtain RDD2, and calculates RDD3 from RDD2. Again, the previous logic is recalculated to calculate RDD4.

By default, the RDD recalculates both the RDD and its parent RDD when the operator operation is performed on an RDD for several times. This is often the case when you’re actually developing code, but it’s important to avoid reevaluating an RDD multiple times, which can lead to dramatic performance degradation.

In order to obtain the result data of an RDD, the cache can be set after a lot of operators or complicated calculation logic, that is, the data of an RDD is not easily obtained

Conclusion: The RDD that has been used for many times, that is, the public RDD, can be persisted to avoid subsequent need and recalculate again to improve efficiency.

2.9.3 Clearing Cache Data

  1. Automatic cleanup: After an application application ends, the corresponding cached data is automatically cleared

  2. Manual cleanup: Call the UNpersist method of the RDD

Although it is possible to cache RDD data, store it in memory or on disk, and then retrieve it directly from memory or disk, note that this approach is not particularly safe.

Cache Directly stores data in the memory. Subsequent operations are fast and can be obtained directly from the memory. However, this method is very insecure, because the server hangs or the process terminates, resulting in the loss of data.

Persist can save data to a local disk and retrieve it later, but it is not particularly secure and may result in data loss due to misoperations by a system administrator or disk corruption.

So is there a safer way?

2.10 Checkpoint mechanism of RDD

Checkpoint offers a relatively more reliable way to persist data. It stores data in a distributed file system, such as HDFS. HDFS uses high availability and fault tolerance (multiple copies) to ensure data security to the greatest extent.

2.10.1 How Do I Set checkpoint

1. Set a checkpoint directory on the HDFS

sc.setCheckpointDir("hdfs://node1:9000/checkpoint"

Copy the code

2. Invoke the checkpoint method on the RDD that needs to be checkpoint

val rdd1=sc.textFile("/words.txt")

rdd1.checkpoint

val rdd2=rdd1.flatMap(_.split("")) 

Copy the code

3. An action is required to trigger the task. (Checkpoint operation requires an action, and an action corresponds to a subsequent job. After the job is executed, it starts another job to perform rdd1.checkpoint.)

Before the checkpoint operation, you can perform a cache operation to cache the RDD data, and then directly obtain the RDD data from the cache and write it to the specified checkpoint directory

rdd2.collect

Copy the code

So let’s summarize the differences between cache, persist, and checkpoint

The cache and persist

cacheDefault data is cached in memory

Persist can store data in memory or on disk

And then you have to triggercacheAnd persist, which requires oneactionoperation

It will not open any other new missions, oneactionAn operation corresponds to a job

It does not change the RDD dependencies, and the corresponding cached data disappears automatically after the program is run

Copy the code

checkpoint

Data can be persistently written to HDFS

To trigger the checkpoint persistence operation, an action is required, and a new job is started to perform the checkpoint operation

It will change the dependency relationship of RDD, and the subsequent data loss cannot be restored through lineage.

(It removes dependencies because it determines that you have persisted to HDFS)

The checkpoint data does not disappear after the program runs

Copy the code

2.11 DAG directed acyclic graph generation

A DAG(Directed Acyclic Graph) is called a Directed Acyclic Graph (with direction, no closed loop, representing the flow of data). The original RDD is formed by a series of transformations.

This is the DAG generated in our wordCount example, This diagram can be seen in the Spark-shell –> Completed Jobs Description of the Running Application in the Web interface mentioned in the last lecture

2.11.1 DAG divides stages

What is stage: A Job is divided into multiple groups of tasks. Each group of tasks is called a stage. Stages indicate different scheduling stages

There are two types of stage:

  1. ShuffleMapStage: All transformations before the last shuffle are called ShuffleMapStage, and the corresponding task is shuffleMapTask
  2. ResultStage: The operation after the last shuffle is called ResultStage, which is the last Stage. Its corresponding task is ResultTask

2.11.2 Why do we divide stages

DAG can be divided into different stages according to the dependency relationship between RDD. For narrow dependency, partition conversion is performed in one Stage. For wide dependency, due to Shuffle, the following calculation can only be started after parent RDD processing is completed.

Since there are only narrow dependencies but no wide dependencies in the same stage after the stage is divided, pipelined computing can be realized. Each partition in the stage corresponds to a task, and there are many tasks that can be run in parallel in the same stage.

2.11.3 How to divide stages

We divide stages based on wide dependencies

  1. First, DAG directed acyclic graph is generated according to the operator operation sequence of RDD. Next, a new stage is created by moving forward from the last RDD and adding the RDD to the stage, which is the last stage.

  2. In the process of moving forward, the RDD will be added to this stage if the operation encounters narrow dependence. If the operation encounters wide dependence, the RDD will be cut from the position of wide dependence, and the last stage will be divided.

  3. Create a new stage again and continue to follow the second step until you reach the original RDD. The entire stage partition is completed

2.11.4 The relationship between stages

After stages are divided, there are many tasks in each stage that can be run in parallel. Later, tasks in each stage are encapsulated in a taskSet. Finally, tasksets are submitted to the executor process on the worker node for execution.

There is a dependency relationship between RDD and RDD, and there is also a dependency relationship between stage and before stage. Tasks in the earlier stage are run first, and tasks in the later stage are run after completion of running. That is, the input data of the task in the later stage is the output data of the task in the previous stage.

finally

Above we have briefly gone over some basic points of RDD, and some more in-depth and detailed aspects will be explained in Spark Core, Spark Streaming, interested friends can continue to pay attention to