This is the 17th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

An overview,

Sometimes you need to share variables between multiple tasks, or between tasks and Driver programs.

In Spark jobs, user-written higher-order functions are executed by executors in a cluster that may use the same variables, which are copied to each Executor, and the Executor’s updates to the variables are not passed back to the Driver.

To meet this requirement, Spark provides two types of shared variables:

  • Broadcast variable (broadcast variable)
  • Accumulator (accumulator).

Of course, with distributed variables, there are consistency issues if left unchecked, so shared variables are two very special types of variables.

These two variables can be thought of as global variables outside the operator defined data pipeline for all computation tasks.

Broadcast variables and accumulators are used to optimize the Spark program.

2. Broadcast variables

A broadcast variable is similar to the DistributeFile in MapReduce, which is usually a small data set. Once a broadcast variable is created in the Driver, the entire data set is broadcast across the cluster, making all running computing tasks read-only.

Broadcast variables support some simple data types, such as integers and collection types, as well as many complex data types, such as some custom data types.

Broadcast variables Many methods are used to ensure that data is broadcast to all nodes.

This is an important issue, because you can’t expect 100 or 1,000 executors to connect to the Driver and pull data, which would overwhelm the Driver. Executor uses HTTP connections to pull data, similar to BitTorrent peer-to-peer transmission. This approach is more scalable and avoids Driver failure when all executors request data to the Driver.

The Spark broadcast mechanism works like this:

  1. DriverSlice the serialized data into smaller pieces and store them in your own block managerBlockManagerWhen theExecutorTo start running, eachExecutorFirst try to get the broadcast variable from your own internal block manager, if it has been broadcast before, then use it directly.
  2. If not,ExecutorfromDriverOr whatever else is availableExecutorTo pull the block. Once you get a block, you put it in your own block manager. For oneself and other needs to pullExecutorUse. That’s a good way to prevent itDriverSingle point of performance bottleneck.

Broadcast variables Share variables between node executors (broadcast by the Driver). Broadcast variables are used to distribute large objects efficiently. Send a large read-only value to all working executors for use by one or more operations.

The procedure for using broadcast variables is as follows:

  1. For a typeTObject callSparkContext.broadcastCreate aBroadcast[T]Object. Any serializable type can be implemented this way (inDriverSide)
  2. throughvalueProperty to access the value of the object (inExecutorC)
  3. Variables are only sent to eachExecutorOnce, treated as a read-only value

As shown in the figure:

Parameters related to broadcast variables:

  • spark.broadcast.blockSize(Default value:4m)
  • spark.broadcast.checksum(Default value:true)
  • spark.broadcast.compress(Default value:true)

Three, the accumulator

The accumulator is used to accumulate the state of a variable in different executors.

The accumulator is defined and read on the Driver side. Complete accumulation in Executor

The accumulator is also lazy and needs to be triggered by an Action; Action triggers once, executes once, triggers multiple times, executes multiple times;

A typical application scenario of accumulator is to record the number of events in Spark Streaming application.

Speaking:

val data = sc.makeRDD(Seq("hadoop map reduce"."spark mllib"))
1 / / way
val count1 = data.flatMap(line => line.split("\\s+")).map(word => 1).reduce(_ + _)
println(count1)


// Method 2. The wrong way
var acc = 0
data.flatMap(line => line.split("\\s+")).foreach(word => acc += 1)
println(acc)

// Define variables in the Driver. Each running Task gets a new copy of these variables, but updating the values of these copies in the Task does not affect the values of the corresponding variables in the Driver
Copy the code

Spark provides three types of built-in accumulators:

  • LongAccumulator: used to accumulate integer types
  • DoubleAccumulator: used to accumulate floating point types
  • CollectionAccumulator: used to accumulate collection elements
val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+"))

val acc1 = sc.longAccumulator("totalNum1")
val acc2 = sc.doubleAccumulator("totalNum2")
val acc3 = sc.collectionAccumulator[String] ("allWords")
val rdd = data.map { word =>
acc1.add(word.length)
acc2.add(word.length)
acc3.add(word)
word
}

rdd.count
rdd.collect
println(acc1.value)
println(acc2.value)
println(acc3.value)
Copy the code