Shared variables

Typically, when a function is passed to a Spark operation (such as Map,reduce), it is executed on a remote cluster node, and it uses a copy of all the variables in the function. These variables are copied to all machines, and variables that have not been updated on the remote machine are passed back to the driver. Using common, read-write shared variables between tasks is inefficient. However, Spark provides two limited types of shared variables, broadcast variables and accumulators

Broadcast Variables

The benefit of broadcasting variables is that instead of having a copy of variables per task, the executor of each node has only one copy. In this way, you can make far fewer copies of the variable. Broadcast variables allow programmers to keep read-only variables on each machine instead of shipping copies of them using tasks. For example, they can be used to provide a copy of a large input data set for each node in an efficient manner. Spark also tries to use efficient broadcast algorithms to distribute broadcast variables to reduce communication costs.

The Spark action is executed through a component phase, separated by discrete “randomplay” operations. Spark automatically broadcasts the common data required by tasks at each stage. The data broadcast in this way is cached in serialized form and deserialized before running each task. This means that the explicit creation of broadcast variables is only useful when the same data is required for tasks that span multiple phases or when it is important to cache the data in a deserialized format.

1.1 Broadcast variables can be created by calling sparkContext.broadcast (v) on a variable v. The broadcast variable is a wrapper around v and can be accessed through the value method. Examples are as follows:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, Scala > val broadcastNumber = sc.broadcast(number) Scala > val data = sc.parallelize(1 to Scala > val bn = data.map(_* broadcastNumber.value)Copy the code

After the broadcast variable is created, it should be used in all functions on the cluster instead of using v. This way v is not transmitted between nodes more than once. In addition, to ensure that all nodes get the same variable, object V should not be modified after being broadcast.

1.2. Why use broadcast variables

Every task execution is to copy a copy of a global, each task processing data, all want to copy a copy of the data, if there are one million task executed in parallel, then copy of one million data, the data quantity is very large, if the global variables are very big, will lead to oom each task execution time, Each variable is broadcast, one copy of each exector, in memory for internal use in the thread pool. Broadcast is a global read-only variable that the Driver sends to the current Application that is allocated to all executor memory levels. Threads in the thread pool in Exeutorz share global variables, greatly reducing network traffic (otherwise each task has to be copied), and greatly saving memory.

1.3 Broadcast variables and each task copy a data comparison

2, Accumulators(Accumulators)

Accumulators are variables that are “added” only through association and swap operations, and therefore can be efficiently supported in parallel. They can be used to implement counters (such as MapReduce) or summations. Spark itself supports an accumulator for numeric types, and programmers can add support for new types.

As a user, you can create named or unnamed accumulators. As shown in the figure below, the named accumulator (in this case, the counter) is displayed in the Web UI to modify the stage of the accumulator. Spark displays the value of each accumulator modified by the task in the Task table.

By calling the SparkContext. LongAccumulator () or SparkContext. DoubleAccumulator () to create Long or respectively of type Double values to create a digital accumulator. Tasks running on the cluster can then be added to it using the Add method. But they don’t see the value in it. Only the driver can read the value of the accumulator using its value method.

The following code shows an accumulator used to add the elements of an array:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10
Copy the code

Problems occurred in actual operation:


scala> val accum = sc.longAccumulator("My Accumulator")
<console>:27: error: value longAccumulator is not a member of org.apache.spark.SparkContext
         val accum = sc.longAccumulator("My Accumulator")
                        ^

scala> 
Copy the code

Test cases:

 val accum = sc.accumulator(0)
 val dataset = sc.parallelize(1 to 10)
 dataset.foreach(x=>accum.add(x))
 accum.value
Copy the code