Elastic Distributed Data Sets (RDDs)

Spark is built around the concept of an elastic distributed data set (RDD), a collection of fault-tolerant elements that can operate in parallel. There are two ways to create an RDD: parallelize an existing collection in a driver, or reference a data set in an external storage system, such as a shared file system, HDFS, HBase, or any data source that provides Hadoop InputFormat.

Parallelized Collections

Parallelized collections are created on top of existing collections in your driver (Scala Seq) by calling SparkContext’s parallelization method. Copy the elements of the collection to form a distributed data set that can be manipulated in parallel. For example, here’s how to create a parallel collection containing the numbers 1 through 5:

val data = Array(1.2.3.4.5)
val distData = sc.parallelize(data)
Copy the code

Once created, you can manipulate distData in parallel. For example, we can call distdata.reduce ((a, b) = > a + b) to add elements to the array. We will describe operations on distributed data sets later.

An important parameter of the parallel collection is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically, you need to allocate 2-4 partitions for each CPU in the cluster. Generally, Spark tries to automatically set the number of partitions based on the cluster. However, you can also set it manually by passing it as the second parameter to parallelization (for example, sc.paralize (data, 10)). Note: Some places in the code use the term slice (a synonym for partitioning) to maintain backward compatibility.

External Datasets

Spark can create distributed data sets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, and Amazon S3. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

You can use SparkContext’s textFile method to create the textFile RDDs. This method takes the URI of the file (local path on the machine or HDFS ://, S3a ://, etc URI) and reads it as a collection of rows. Here’s an example:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
Copy the code

Once created, you can manipulate the distFile through dataset operations. For example, we can use map to add up the size of all rows and reduce operations as follows: distfile.map (s => s.length).reduce((a, b) => a + b).

Some comments about reading files using Spark:

  • If you use a path on the local file system, you must also access the file on the same path on the secondary node. Copy the file to all worker nodes, or to a shared file system mounted using the network.
  • All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards. For example, you can use textFile(“/my/directory”).textFile(“/my/directory/.txt”), and textFile(“/my/directory/.gz”) when reading multiple files, the order of partitions depends on the order in which the files are returned from the file system. For example, it may or may not follow the lexicographical order of files sorted by path. In a partition, elements are sorted in the order they are in the underlying file.
  • The textFile method also takes an optional second parameter to control the number of partitions for the file. By default, Spark creates one partition for each block of a file (128 MB by default in HDFS), but you can also request a higher number of partitions by passing a larger value. Note that partitions cannot be less than blocks.

In addition to text files, Spark’s Scala API supports several other data formats:

  • SparkContext. WholeTextFiles allows you to read directory contains multiple small text files, and each of these documents as the return (filename, content). This is in contrast to textFile, which returns one record per line in each file. Partitioning is determined by data locality, which in some cases can result in too few partitions. For these cases, wholeTextFiles provides an optional second parameter that controls the minimum number of partitions.
  • For SequenceFiles, use SparkContext’s sequenceFile [k, v] method, where k and v are the types of keys and values in the file. These should be subclasses of Hadoop’s Writable interface, just like Intwrtable and Text. In addition, Spark allows you to specify native types for some common Writables; For example, sequenceFile [Int, String] will automatically read IntWritables and Texts.
  • For other Hadoop InputFormats, you can use the SparkContext. Hadoop/ RDD method, which accepts arbitrary JobConf and input format classes, key classes, and value classes. These values are set in the same way as a Hadoop job using the input source. API (you can also based on the “new” graphs. Org. Apache hadoop. Graphs) to use InputFormats SparkContext. NewAPIHadoopRDD.
  • Rdd.saveasobjectfile and SparkContext.objectFile support saving RDD in a simple format containing serialized Java objects. While not as efficient as a proprietary format like Avro, it provides an easy way to save any RDD.

RDD Operations (RDD Operations)

You can perform two types of operations: transformations that create new sets of data from existing ones, or actions that run calculations on a set and then return a value to the driver. For example, a map is a transformation that passes each dataset element through a function and returns a new RDD representing the result. Reduce, on the other hand, is an action that aggregates all the elements of an RDD using a function and returns the end result to the driver (although there is also a parallel reduceByKey that returns a distributed data set).

All transformations in Spark are lazy because they don’t compute results immediately. Instead, they only remember transformations that apply to some basic data sets, such as files. The transformation is computed only if the operation requires the result to be returned to the driver. This design allows Spark to run more efficiently. For example, we can recognize that the data set created by Map will be used for Reduce and only the results of Reduce will be returned to the driver, not the larger mapped data set.

By default, you can recalculate the transformed RDD every time you run an operation on it. However, you can also persist the RDD in memory using the persist (or cache) method, in which case Spark keeps the element in the cluster for faster access the next time it is queried. It also supports persistent RDD on disk or replication across multiple nodes.

Basic knowledge

To illustrate the basics of RDD, consider the following simple program:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
Copy the code

The first line defines the base RDD from an external file. This data set is not loaded in memory or otherwise manipulated: rows are merely Pointers to files. The second line defines lineLengths as the result of a mapping transformation. Also, due to laziness, lineLengths cannot calculate immediately. Finally, we run reduce, which is an action. At this point, Spark breaks the calculation into tasks to run on separate machines, with each machine running its mapped part and local reduction, returning only its answers to the driver.

If we want to use lineLengths again later, we can add:

lineLengths.persist()
Copy the code

Prior to Reduce, this would cause lineLengths to be saved in memory after the first calculation.

Passing Functions to Spark

Spark’s API relies heavily on functions passed in the driver to run on the cluster. There are two suggested ways to do this:

  • Use the syntax of anonymous functions, which makes the code much cleaner.
  • Static methods that use global singletons. For example, you can define function objects, objectMyFunctions, and then apply theMyFunction.func1The method is passed to Spark as follows:
object MyFunctions {
  def func1(s: String) :String= {... } } myRdd.map(MyFunctions.func1)
Copy the code

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton), this requires that the object containing the class be sent along with the method. For example, consider:

class MyClass {
  def func1(s: String) :String= {... }def doStuff(rdd: RDD[String) :RDD[String] = { rdd.map(func1) }
}
Copy the code

Here, if we create a new MyClass instance and call doStuff on it, the map will refer to the Func1 method of the MyClass instance, so we need to send the whole object to the cluster. It is similar to writing rd.map (x = > this.func1(x)).

Similarly, fields that access an external object reference the entire object:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String) :RDD[String] = { rdd.map(x => field + x) }
}
Copy the code

This is equivalent to writing rd.map (x = > this.field + x), which references all of these. To avoid this problem, the simplest way is to copy the field into a local variable instead of accessing it externally:

def doStuff(rdd: RDD[String) :RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
Copy the code

Understanding closures

One of the difficulties with Spark when executing code across clusters is understanding the scope and lifecycle of variables and methods. RDD operations that modify variables outside of scope can often cause confusion. In the example below, we’ll look at the code that increments the counter using foreach (), but similar problems can occur with other operations.

Example Example

Consider the following RDD element, sum, which may behave differently depending on whether or not it executes in the same JVM. A common example is to run Spark in local mode (– master = local [n]) and deploy the Spark application in a cluster (for example by spark-submit to YARN) :

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
Copy the code

Local vs. Cluster modes Local vs. cluster modes

The behavior of the code above is undefined and may not work as expected. To execute a job, Spark breaks down the processing of an RDD operation into tasks, each of which is performed by the performer. Before execution, Spark calculates the closure of the task. Closures are variables and methods (in this case foreach ()) that must be visible when the executor performs a calculation on the RDD. This closure is serialized and sent to each actuator.

The variable in the closure sent to each actuator is now a copy, so when counter is referenced in the foreach function, it is no longer a counter on the driver node. There is still a counter in the memory of the driver node, but it is no longer visible to the actuator! The executor sees only a copy of the serialization closure. Therefore, the final value of the counter is still zero, because all operations on the counter reference that value in the serialization closure.

In local mode, in some cases, the foreach function will actually execute in the same JVM as the driver and reference the same raw counter, possibly actually updating it.

To ensure well-defined behavior in these scenarios, Accumulator should be used. The accumulator in Spark is specifically designed to provide a mechanism to safely update variables when execution is separated between work nodes in the cluster. These issues are discussed in more detail in the accumulator section of this guide.

In general, closure constructs (such as loops or locally defined methods) should not be used to change some global state. Spark does not define or guarantee mutational behavior for objects referenced outside of closures. Some of the code that does this may work in local mode, but this is only by accident, and the code does not behave as expected in distributed mode. If you need some global aggregation, use cumulator.

Printing elements of an RDD

Another common idiom is to try to print elements of an RDD using rdd. foreach (println) or rdd. map (println). On a single machine, this produces the expected output and prints all the elements of the RDD. However, in clustered mode, the standard output called by the executor is now being written to the standard output of the executor, not the one on the drive, so the standard output on the drive doesn’t show these things! To print all elements on the driver, you can use the collect () method to first bring the RDD to the driver node: rdD.collect (). Foreach (println). However, this can cause the driver to run out of memory because Collect () extracts the entire RDD onto a single machine; If you only need to print a few elements of the RDD, a safer approach is to use take () : rdd.take (100). Foreach (println).

Working with key-value Pairs Use key-value Pairs

While most Spark operations work on RDD that contains objects of any type, only a few special operations are available for RDD of key-value pairs. The most common are distributed “shuffling” operations, such as keystrokes that group or aggregate elements.

In Scala, these operations are automatically available on RDDs that contain tuple 2 objects (built-in tuples in the language, created by simply writing (a, b)). Key-value pair operations can be used in the PairRDDFunctions class, which automatically wraps the RDD of a tuple.

For example, the following code uses the reduceByKey operation for key-value pairs to count the number of occurrences of each line of text in the file:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Copy the code

For example, we can also sort these pairs alphabetically using counts.sortBykey () and finally return them to the driver as an array of objects using counts.collect ().

Note: When using custom objects as keys in key-value pair operations, you must ensure that the custom equals () method is accompanied by the matching hashCode () method. For more information, see the contracts outlined in the Object.hashCode () document.

Transformations shift

The following table lists some common transformations supported by Spark. Refer to rDDapidoc (Scala, Java, Python, R) and the RDD function doc (Scala, Java) for details.

Transformation Meaning
map**(func) * * Returns a new distributed dataset by passing each element of the source to the function func.
filter(func) Returns a new dataset by selecting the element in the source for which func returns true.
flatMap(func) Similar to a map, but each input item can map to zero or more output items (so func should return Seq instead of a single item).
mapPartitions(func) Similar to map, but run separately on each partition (block) of the RDD, so func must be an Iterator => Iterator to run on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the partition index, so func must be of type (Int, Iterator) => Iterator when run on an RDD of type T.
sample**(withReplacement.fraction.seed) * * Sampling a portion of the data with or without substitution, using the given random number generator seed.
union(otherDataset) Returns a new dataset containing the union of elements and parameters in the source dataset.
intersection(otherDataset) Returns a new RDD that contains the intersection of elements and parameters in the source dataset.
distinct([numPartitions])) Returns a new dataset containing different elements of the source dataset.
groupByKey([numPartitions]) Returns an (K, Iterable) pair of datasets when called from a (K, V) pair. Note: Use reduceByKey or aggregateByKey to get better performance if you are grouping in order to perform aggregation (such as summation or average) for each key. Note: By default, the level of parallelism in the output depends on the number of partitions in the parent RDD. You can pass an optional numPartitions parameter to set a different number of tasks.
reduceByKey(func[numPartitions]) When receiving (K, V) data set pairs, return a data set (K, V) for each key value where aggregated using the given reduction function func, must type (V, V) = > v. v. groupByKey, reduce the number of tasks by an optional second parameter is configurable.
aggregateByKey(zeroValue) (seqOp.combOp[numPartitions]) When the dataset of a (K, V) pair is called, the dataset of a (K, U) pair is returned, where the values of each key are aggregated using the given Combine and an intermediate “zero” value. Allows aggregate value types different from input value types while avoiding unnecessary allocation. Like groupByKey, the number of Reduce jobs can be configured using the second optional parameter.
sortByKey([ascending], [numPartitions]) When called on the data set of a (K, V) pair, where K implements Ordered, returns the data set of a (K, V) pair, pressed in ascending or descending order, as specified in the Boolean ascending argument.
join(otherDataset[numPartitions]) When calling datasets of types (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs, each key containing all pairs of elements. External connections are supported through leftOuterJoin, RighttouterJoin, and fullOuterJoin.
cogroup(otherDataset[numPartitions]) Returns a dataset of (K, (Iterable, Iterable)) tuples when calling datasets of types (K, V) and (K, W). This operation is also called groupWith.
cartesian(otherDataset) When called on a dataset of types T and U, returns a dataset of (T, U) pairs (all element pairs).
pipe(command.[envVars]) Pipe each partition of the RDD through a shell command, such as a Perl or bash script. The RDD element is written to the process’s STDIN, and output to its STdout line is returned as the RDD of the string.
coalesce(numPartitions) Reduce the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering large data sets.
repartition**(numPartitions) * * Randomly redistribute the data in the RDD to create more or fewer partitions and balance between them. This always scrambles all the data on the network.
repartitionAndSortWithinPartitions**(partitioner) Repartition the RDD according to the given partitioning program, and in each generated partition, the keys sort the records. This is more efficient than calling repartitioning and then sorting within each partition, because it pushes sorting into the shuffle mechanism.

Actions

The following lists the common actions supported by Spark. For details, refer to the RDD API documentation (Scala, Java, Python, R) and the key-value pair RDD method documentation (Scala, Java).

Action Meaning
reduce(func) The elements of the dataset are aggregated using the function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be properly evaluated in parallel.
collect(a) Returns all elements of a data set as an array in the driver. This is usually useful after filters or other operations return a small enough subset of data.
count(a) Returns the number of elements in the dataset.
first(a) Returns the first element of the data set (similar to take(1)).
take(n) Returns an array containing the first n elements of the dataset.
takeSample(withReplacement.num[seed]) Returns an array containing a random sample of the num elements of the dataset, optionally pre-specified with or without a random number generator seed.
takeOrdered(n.[ordering]) Return the first n elements of the RDD using the natural order of the RDD or a custom comparator.
saveAsTextFile(path) Writes elements of a dataset as text files (or text filesets) to a given directory on a local file system, HDFS, or any other Hadoop supported file system. Spark calls toString on each element, converting it to a line of text in a file.
saveAsSequenceFile(path)

(Java and Scala)
Write the elements of the dataset as Hadoop SequenceFile to a given path on the local file system, HDFS, or any other Hadoop supported file system. This is available on an RDD that implements a key-value pair for Hadoop’s writable interface. In Scala, it also works with types that are implicitly converted to Writable (Spark includes primitive type conversions such as Int, Double, String, and so on).
saveAsObjectFile(path)

(Java and Scala)
The elements of the dataset are written in a simple format using Java serialization, which can then be loaded using sparkContext.objectfile ().
countByKey(a) Available only on RDD of type (K, V). Returns a hash map of (K, Int) pairs containing a count for each key.
foreach(func) Each element of the dataset is processed using the function func. This operation is typically used to update an Accumulator or interact with an external data source. Note: Modifying an accumulator variable outside of foreach() may have uncertain consequences. Read for detailsUnderstanding closuresPart.

Shuffle Operations (Shuffle Operations)

An operation in Spark triggers the Shuffle event. Shuffle is a mechanism used by Spark to regroup and redistribute data on multiple partitions. Shuffle is a complex and expensive operation that involves copying data between executors and machine nodes.

Background (Background)

Use the reduceByKey operation as an example to understand the shuffle process. The reduceByKey operation generates a new RDD, in which the aggregate values of all records with the same key in the original data are merged into a tuple, and the corresponding value of the key in this tuple is the result after the reduce function is executed. The challenge with this operation is that all records with the same key are not on the same partition, or even on the same machine, but the operation must combine the records.

In Spark, data is generally not distributed across partitions except where necessary for a particular operation. During computation, a partition is processed by a task. Therefore, in order to organize all data for a reduceByKey task to perform, Spark needs to perform an all-to-all operation. In the all-to-all operation, all keys and their corresponding values are read from all partitions, the data on multiple partitions is summarized, and the data on multiple partitions corresponding to each key is calculated to obtain the final result. This process is called shuffle.

Although the newly shuffled data elements in each partition are determined and the order between partitions is also determined, all the elements are out of order. To sort data according to the specified rule after shuffle, use the following method:

  • usemapPartitionsOperations are sorted on each partition, and sorting is available.sortedMethods.
  • userepartitionAndSortWithinPartitionsOperations efficiently sort partitions while repartitioning.
  • usesortBySort the RDD.

The operations that cause shuffle are as follows:

  • repartitionOperations, such as:repartition,coalesce
  • ByKeyOperations (except counting operations), for example:groupByKey,reduceByKey
  • joinOperations, such as:cogroup,join

Performance Impact

Shuffle is a costly operation that involves disk I/O, data serialization, and network I/O. To prepare data for the shuffle operation, Spark starts a series of Map and Reduce tasks. The Map task processes data, and the Reduce task collects data after the Map task processes data. Map and Reduce are from MapReduce and have nothing to do with Spark’s Map and Reduce operations.

Internally, all the resulting data of a Map task is kept in memory until it cannot be fully stored. The data is then sorted based on the destination partition and written to a separate file. In Reduce, the task reads the relevant sorted data blocks.

Some shuffle operations consume a large amount of heap memory space, because shuffle operations use data structures in memory to organize data before and after data conversion. In particular, reduceByKey and aggregateByKey create these data structures during Map and ByKey operations create these data structures during Reduce. When the memory is full, Spark saves the overflow data to the disk, which increases the disk I/O overhead and garbage collection overhead.

The shuffle operation also generates a large number of intermediate files on disk. In Spark 1.3, these files are retained until the corresponding RDD is no longer used and garbage collected. In this way, if Spark recalculates the LINEAGE of the RDD, the intermediate files generated by shuffle do not need to be created again. If Spark applications keep references to RDD for a long time or garbage collection is infrequent, the garbage collection cycle will be long. This means that running Spark tasks for a long time may consume a large amount of disk space. The temporary data storage path can be configured by setting spark.local.dir in SparkContext.

You can set multiple parameters for the shuffle operation. For details, see Shuffle Behavior in the Configuration Guide.

RDD Persistence

An important feature of Spark is the ability to persist (or cache) data, which can be accessed across multiple operations. When persisting an RDD, each node stores the block of data calculated by the node into memory, and other action operations on the data directly use the data in memory. This will make future action operations calculate faster (typically up to 10 times faster). Caching is an important tool for iterative algorithms and fast interactive use.

RDD can be persisted using the persist() or cache() methods. The data will be computed on the first action and cached in the memory of each node. Spark’s cache has a fault tolerance mechanism. If a partition of a cached RDD is lost, Spark automatically recalculates and caches the RDD according to the original calculation process.

In addition, each persistent RDD can be cached using different storage levels, such as persistency to disk, persistency to memory in the form of serialized Java objects (which saves space), replication across nodes, and storage in Tachyon in off-heap mode. These storage levels are set by passing a StorageLevel object (Scala, Java, Python) to the persist() method. The cache() method is a quick way to set up using the default StorageLevel, storagelevel.memory_only (to store deserialized objects in memory). The storage levels are described as follows:

  • MEMORY_ONLY: Stores THE RDD in the JVM as a deserialized Java object. If the memory space is insufficient, some data partitions will no longer be cached and will be recalculated each time the data is needed. This is the default level.
  • MEMORY_AND_DISK: Stores RDD in the JVM as deserialized Java objects. If memory space is insufficient, uncached data partitions are stored to disk and read from disk when they are needed.
  • MEMORY_ONLY_SER: Stores RDD as serialized Java objects (each partition as a byte array). This method saves a lot of space compared to deserializing objects, especially if fast Serializer is used, but it adds CPU overhead when reading objects.
  • MEMORY_AND_DISK_SER: Similar to MEMORY_ONLY_SER, but overflow partitions are stored to disk rather than recalculated when they are used.
  • DISK_ONLY: Caches RDD only on disk.
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2, etc. : Same functionality as above, except that each partition makes copies on two nodes in the cluster.
  • OFF_HEAP (experimental) : RDD is stored to Tachyon in serialized format. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead, causes executors to be smaller, and shares the memory pool, performing better in heaps and multi-application parallel environments. In addition, because the RDD is stored in Tachyon, executor crashes do not result in a loss of cached data in memory. In this mode, memory in Tachyon is throwable. Therefore, Tachyon does not attempt to recreate a chunk that has been cleared in memory. If you plan to use Tachyon for off-heap level caching, Spark is compatible with the currently available version of Tachyon. Please refer to Tachyon’s notes for detailed version pairing suggestions.

Note that in Python, cached objects are always serialized using Pickle, so it doesn’t matter which serialization level you choose in Python.

In the Shuffle operation (such as reduceByKey), Spark automatically caches some intermediate data even if the user does not invoke the persist method. In this way, when a node fails during shuffle, there is no need to recalculate all input data. If you want to use an RDD more than once, it is highly recommended that you call the persist method on that RDD.

Which Storage Level to Choose?

The core of Spark storage tier selection is the tradeoff between memory usage and CPU efficiency. You are advised to select a storage tier as follows:

  • If the default storage level (MEMORY_ONLY) is used and there is no overflow of RDD stored in memory, then the default storage level is selected. The default storage level maximizes CPU efficiency and enables the fastest running of operations on the RDD.
  • If memory cannot hold all of the RDD, use MEMORY_ONLY_SER and pick a fast serialization library to serialize the object to save memory space. Computations are still fast with this storage level.
  • Try not to store overflow data to disk, except in cases where computing the data set is particularly costly or where a large amount of data needs to be filtered. Because recalculating the data partition takes about as long as reading the data from disk.
  • To quickly restore faults, you are advised to use the multi-copy storage sector (for example, if Spark is used as the background service of a Web application and the service needs to be quickly recovered when a fault occurs). All storage levels provide full fault tolerance by recalculating lost data. However, in the event of data loss at the multi-replica level, the corresponding database does not need to be recalculated and the task can continue to run.
  • In high memory consumption or multitasking environments, the experimental OFF_HEAP mode has several advantages:
    • It enables multiple executors to use the same memory pool in Tachyon.
    • It significantly reduces the cost of memory reclamation.
    • If an individual executor crashes, the cached data is not lost.

Removing Data

Spark automatically monitors the cache usage on each node and removes old data blocks from memory in least-recently used (LRU) mode. If you want to manually remove an RDD, rather than wait for the RDD to be removed automatically by Spark, use the rdd.unpersist () method.

Shared Variables

Typically, a method passed to a Spark operation (such as Map or Reduce) is executed on a node on a remote cluster. The variable used by the method during execution of multiple nodes is multiple copies of the same variable. These variables are copied to each machine as copies, and changes to the variables on each remote machine are not passed back to the driver program. However, to satisfy two common usage scenarios, Spark provides two specific types of shared variables: Broadcast variables and Accumulators.

Broadcast variables

Broadcast variables allow the programmer to cache a read-only variable on each machine instead of passing a copy to each task. For example, broadcast variables can be an efficient way to pass a large copy of the data set to each node. When using broadcast variables, Spark also tries to use efficient broadcast algorithms to distribute variables to reduce communication costs.

The Spark action operation is performed through a series of stages, which are shelled through the distributed shuffle operation. Spark automatically broadcasts the public data required by the task in each phase. The broadcast data in this case is cached in serialized form and deserialized before each task is run. This makes it clear that using broadcast variables works well only when multiple tasks across multiple phases use the same data, or when it is particularly important to use data in deserialized form.

Broadcast variables are created by calling the sparkContext.broadcast (v) method on a variable v. The broadcast variable is a wrapper around v, and the value of V can be accessed through the value method. A code example is as follows:

scala> val broadcastVar = sc.broadcast(Array(1.2.3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1.2.3)
Copy the code

After the broadcast variable is created, it should be used in place of the original v value in all functions executed on the cluster. Therefore, v is distributed at most once on each node. In addition, object V should not be modified after the broadcast to ensure that the broadcast variable distributed to all nodes has the same value (for example, after the broadcast variable is distributed, the broadcast variable is modified, and then the broadcast variable needs to be distributed to the new node).

Accumulators

The accumulator allows only associated operations to be “added”, so specific calculations can be supported in parallel computing. An accumulator can be used to implement counting (as in MapReduce) or summing. Native Spark supports numerical accumulators, and programmers can add new support types. After an accumulator is created and named, the accumulator is displayed on the Spark UI. This helps you understand what is going on in the running phase (note that it is not yet supported in Python).

An accumulator can be created by calling sparkContext.Accumulator (v) on the original value v. Tasks running on the cluster can then accumulate the accumulator using the add method or the += operation. Only the driver program can read the value of the accumulator, which is read using the value method. The following code sums the elements of the array:

scala> val accum = sc.accumulator(0."My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1.2.3.4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10
Copy the code

The code example above uses Spark’s built-in accumulator of type Int. Developers can create a new accumulator type by integrating the AccumulatorParam class. The AccumulatorParam interface has two methods: zero and addInPlace. The zero method gives the data type a value of 0, and the addInPlace method adds the two values. For example, if we have a Vector class that represents a mathematical Vector, we can write it as:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector) :Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector) :Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...). ) (VectorAccumulatorParam)
Copy the code

Spark also supports the more generic Accumulable interface to accumulate data, which results in data of a different type than the accumulated element type (for example, creating a list by collecting data elements). In Scala, SparkContext. AccumulableCollection method can be used to accumulate the commonly used Scala collection types.

The update of the accumulator occurs only during the Action operation. Spark ensures that each task can update the accumulator only once, for example, restarting a task. The restarted task cannot update the value of the accumulator. Note that if a task is executed again after the job phase, the update operation for each task will be performed multiple times.

The accumulator does not change Spark’s lazy execution mode. If an accumulator is updated during an operation in the RDD, the value of the accumulator is updated only when the ACTION operation in the RDD is performed. Therefore, in a transformation operation like map(), the update of the accumulator is not performed. The following code snippet demonstrates this feature

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the <code>map</code> to be computed.
Copy the code

Deploying an application to a Cluster

The Application Submission Manual describes how to submit an application to a cluster. Simply put, once you have packaged your application as a JAR (Java/Scala) or a set of.py or.zip files (Python), you can submit the script to the cluster-supported manager using the bin/ spark-Submit script.

Launching Spark Job in Java/Scala (Launching Spark Jobs from Java/Scala)

Using org. Apache. Spark. The launcher package provides a simple Java API, spark assignments can be provided in the package in the form of a subclass of a class of start.

Unit Testing

Spark is friendly for unit testing using popular unit testing frameworks. In test, simply create a SparkContext, set the URL of the master to local, run a few operations, and call sparkContext.stop () to stop the job. Since Spark does not support running two contexts in the same application, be sure to stop the context using either the finally block or the tearDown method of the test framework.

Migrating from pre1.0 Versions of Spark

Spark 1.0 froze the API of the 1.X series Spark core. Therefore, apis not currently labeled “experimental” or “Developer API” will be supported in future releases.

  • The change of the Scala

The change in Scala is that the return type for grouping operations (such as groupByKey, cogroup, and join) changes from (Key,Seq[Value]) to (Key,Iterable[Value]).

  • Changes to the Java API
    • 1.0org.apache.spark.api.java.functionIn the classFunctionThe class becomes an interface, which means that in the old codeextends FunctionShould be changed toimplement Function.
    • Add newmapType operation, for examplemapToPairandmapToDouble, these added operations can be used to create special types of RDD.
    • Grouping operations (e.ggroupByKey,cogroupandjoin) by the return type(Key,Seq[Value])into(Key,Iterable[Value]).

These migration guidelines are also valid for Spark Streaming, MLlib, and GraphX.

Where to Go from Here

You can see some examples of Spark programming on the Spark website. In addition, Spark contains many examples (Scala, Java, Python, R) in the examples directory. You can start Java and Scala examples by passing the example class name to the bin/run-example script of Spark. Such as:

./bin/run-example SparkPi
Copy the code

Python example, submitted using the spark-submit command:

./bin/spark-submit examples/src/main/python/pi.py
Copy the code

R example, submit using the spark-submit command:

./bin/spark-submit examples/src/main/r/dataframe.R
Copy the code

In the Configuration and Tuning manual, there are many optimization procedures. These optimizations ensure that your data is stored in memory in an efficient format. For deployment help, see Cluster Mode Overview, which describes distributed operations and the components that support the cluster manager.

Finally, refer to Scala, Java, Python, R for full API documentation.