This series of articles is based on JerryLead’s SparkInternals. This article is based on the author’s own understanding, annotations, and some source code for learning purposes. After comparison, it is found that the core part has not changed much and is still worth reference

Job logical execution diagram

General logical plan

A typical Job logic execution diagram is shown above. The final execution result can be obtained through the following four steps:

  • Read data from a data source (local file, in-memory data structure, HDFS, HBase, etc.) to create the initial RDD. Parallelize () in the previous chapter example is equivalent to createRDD().
  • A series of transformations () are performed on RDD, and each transformation() produces one or more RDD[T] containing different types of T. T can be a primitive type or data structure in Scala, not limited to (K, V). But if it is (K, V), K cannot be a complex type such as Array (because it is difficult to define partition functions on complex types).
  • Action () is performed on the final RDD and result is generated after each partition is computed.
  • Sends the result back to the driver for final f(list[result]) calculation. The example count() actually consists of action() and sum().

RDD can be cached to memory or checkpoint to disk. The number of partitions in an RDD is not fixed and is usually specified by users. The partition dependence relationship between RDD and RDD may not be 1-to-1. For example, the preceding figure has 1-to-1 relationship and many-to-many relationship.

Generation of logical execution diagrams

After you know the logical execution diagram of a Job, you will form a data dependency diagram in your mind when writing a program. However, there are often more RDD generated than we think.

To solve the logical execution graph generation problem, you need to solve:

  • How and which RDD should be generated?
  • How do I establish the dependencies between RDDS?

1. How and which RDD should be generated?

The initial idea for solving this problem is to have each transformation() method return (new) an RDD. This is basically the case, except that some transformations () are complex and contain multiple sub-transformations (), thus generating multiple RDDS. That’s why there are more RDD than we think.

How do you calculate the data in each RDD? Where does transformation() perform the computing chain? Each RDD has the compute() method, which takes input records from the previous RDD or data source, performs transformation(), and outputs the records.

/** * RDD.scala * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext) :Iterator[T]

  //MapPartitionsRDD.scala
  override def compute(split: Partition, context: TaskContext) :Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
Copy the code

Which RDD is generated is relevant to the computational logic of Transformation (), and some typical transformations () and the RDD they create are discussed below. The website explains what each transformation means. Iterator (split) means foreach a record in the partition. There is a lot of space here because those transformations () are complex and generate multiple RDDS, as illustrated in the next section.

Transformation Generated RDDs Compute()
map(func) MappedRDD iterator(split).map(f)
filter(func) FilteredRDD iterator(split).filter(f)
flatMap(func) FlatMappedRDD iterator(split).flatMap(f)
mapPartitions(func) MapPartitionsRDD f(iterator(split))
mapPartitionsWithIndex(func) MapPartitionsRDD f(split.index, iterator(split))
sample(withReplacement, fraction, seed) PartitionwiseSampledRDD PoissonSampler.sample(iterator(split)) BernoulliSampler.sample(iterator(split))
pipe(command, [envVars]) PipedRDD
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
coalesce(numPartitions)
repartition(numPartitions)

Didn’t find the MappedRDD spark 2 x, FilteredRDD, FlatMappedRDD, unified change to MapPartitionsRDD, there is no change on execution logic

2. How to establish the relationship between RDD?

The data dependency problem between RDD’s actually consists of three parts:

  • Dependencies of the RDD itself. Does the RDD to be generated (denoted later as RDD X) depend on one parent RDD or multiple parent RDDs?
  • How many partitions will there be in RDD X?
  • What is the dependency between RDD X and the partition in the parent RDDs? Is it dependent on one or more partitions in the parent RDD?

The first problem can be solved naturally. For example, x = rdda.transformation(RDDB) (e.g., x = a.jioin (b)) means that RDD x depends on both RDD a and RDD b.

Max (numPartitions[parent RDD 1], numPartitions[parent RDD 1],… , numPartitions[parent RDD n]).

object Partitioner {
  /** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. * * If any of the RDDs already has a partitioner, choose that one. * * Otherwise, we use a default HashPartitioner. For the number of partitions, if * spark.default.parallelism is set, then we'll use the value from SparkContext * defaultParallelism, otherwise we'll use the max number of upstream partitions. * * Unless spark.default.parallelism is set, the number of partitions will be the * same as the number of partitions in the largest upstream RDD, As this should * be least likely to cause out-of-memory errors. * feng: Take the largest RDD partitions * We use two method parameters  (rdd, others) to enforce callers passing at least 1 RDD. */
  def defaultPartitioner(rdd: RDD[_], others: RDD[_] *) :Partitioner = {
    val rdds = (Seq(rdd) ++ others)
    // Feng: Filter RDD with partition first
    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
    if (hasPartitioner.nonEmpty) {
      // Select the PARTItioner of the RDD with the largest number of partitions in the RDD
      hasPartitioner.maxBy(_.partitions.length).partitioner.get
    } else {
      // Use the default HashPartitioner, which takes precedence over the customization of parallelism, otherwise the maximum number of partitions in the RDD is taken, which is the maximum parallelism in the RDD
      if (rdd.context.conf.contains("spark.default.parallelism")) {
        new HashPartitioner(rdd.context.defaultParallelism)
      } else {
        new HashPartitioner(rdds.map(_.partitions.length).max)
      }
    }
  }
}
Copy the code

The third problem is more complicated. The semantics of this transformation() need to be considered, and different transformations () have different dependencies. For example, map() is 1:1, and each partition in the ShuffledRDD of groupByKey() depends on all partitions in the parent RDD.

Again, considering the third issue, each partition in RDD X can depend on one or more partitions in the parent RDD. And this dependency can be a complete or partial dependency. Partial dependency means that part of the data in one partition in the parent RDD is related to one partition in RDD X and part of the data is related to another partition in RDD X. The following figure shows full and partial dependencies.

Partitions in RDD X are completely related to partitions /partitions in parent RDD. The partition in RDD X is related to only part of the data in the parent RDD, and the other part of the data is related to other partitions in RDD X.

In Spark, a full dependency is called NarrowDependency and a partial dependency is called ShuffleDependency. ShuffleDependency actually has the same data dependency as shuffle in MapReduce (Mapper partitions its output, Then each Reducer will fetch its own partitions in all mapper output through HTTP).

  • The first 1:1 scenario is called OneToOneDependency.
  • The second case of N:1 is called N:1 NarrowDependency.
  • The third case of N:N is called N:N NarrowDependency. Complete dependencies that do not fall into the first two cases fall into this category.
  • The fourth is called ShuffleDependency.

NarrowDependency specifies whether Partitoin I in RDD X depends on parrent RDD for one partition or multiple partitions, It is determined by getParents(Partition I) in NarrowDependency (detailed in some examples below). There is also a full dependency of RangeDependency, which is currently only used in UnionRDD and is described below.

OneToOneDependency // Map, filter, etc
class OneToOneDependency[T] (rdd: RDD[T]) extends NarrowDependency[T] (rdd) {
  override def getParents(partitionId: Int) :List[Int] = List(partitionId)
}
@DeveloperApi
class RangeDependency[T] (rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int) :List[Int] = {
    // To judge the rationality of the partitionId, it must be within the reasonable partition of the child RDD
    if (partitionId >= outStart && partitionId < outStart + length) {
      // Calculate the partition ID of the parent RDD
      List(partitionId - outStart + inStart)
    } else {
      Nil}}}Copy the code

Therefore, the dependence between partitions can be summarized as follows:

  • NarrowDependency (Use black solid line or black dotted line arrows)
    • OneToOneDependency (1:1)
    • NarrowDependency (N:1)
    • NarrowDependency (N:N)
    • RangeDependency (used in UnionRDD only)
  • ShuffleDependency (red arrow)

NarrowDependency (N:N) is useful for repartition and coalesce, but it is NarrowDependency (N:N).

NarrowDependency and ShuffleDependency are divided for the purpose of generating physical execution diagrams, which will be explained in the next chapter.

It is important to note that this third NarrowDependency (N:N) is rarely seen between two RDDS. Because if a partition in the parent RDD is dependent by multiple partitions in the Child RDD, the resulting dependency graph is often the same as a ShuffleDependency. The partition in the parent RDD is fully dependent on the parent RDD and partially dependent on the parent RDD. So Spark defines NarrowDependency as “each partition of the parent RDD is used by at most one partition of the child RDD”, This is just OneToOneDependency (1:1) and NarrowDependency (N:1). But it is NarrowDependency (N:N) that can be represented by a bizarre RDD of your own design. The description here is confusing, but the following are typical RDD dependencies.

NarrowDependency is defined in Spark 2.1.1 as: Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. Each child RDD partition depends on a small group of parent RDD partitions NarrowDependency (N:N) is used by a partition in the RDD

How to calculate the data in RDD X (records)? The following figure shows the data dependency of OneToOneDependency. Although the partition is 1:1, it does not mean that a record is read and a record is calculated. The difference between the upper and lower patterns on the right is similar to the difference between the following two programs:

code1 of iter.f()

int[] array = {1.2.3.4.5}
for(int i = 0; i < array.length; i++)
    f(array[i])
Copy the code

code2 of f(iter)

int[] array = {1.2.3.4.5}
f(array)
Copy the code

3. The calculation process and data dependency diagrams of typical Transformation () are presented

1) union(otherRDD)

  /** Build the union of a list of RDDs passed as variable-length arguments. */
  def union[T: ClassTag](first: RDD[T], rest: RDD[T] *) :RDD[T] = withScope {
    union(Seq(first) ++ rest)
  }

  /** Build the union of a list of RDDs. */
  def union[T: ClassTag](rdds: Seq[RDD[T]]) :RDD[T] = withScope {
    val partitioners = rdds.flatMap(_.partitioner).toSet    // Merge class
    Feng: If the two RDD to be merged both contain partitioner and both RDD reference the same Partitioner
    if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
      OneToOneDependency can be regarded as OneToOneDependency, assuming that RDD A and B each have P partitions, which will remain P partitions after merging with the same partitioner
      new PartitionerAwareUnionRDD(this, rdds)
    } else {
      // Either one RDD does not contain partitioner, or the partitioner operators of the two RDD's are different
      new UnionRDD(this, rdds)  //RangeDependency}}Copy the code

Union () simply merges the two RDDS together without changing the data in the partition. RangeDependency is actually 1:1 as well, just for the convenience of accessing partitions in the RDD after Union (), preserving the range boundaries of the original RDD.

From the code,union can also be OneToOneDependency

2) groupByKey(numPartitions)

  def groupByKey() :RDD[(K.可迭代[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))  // Use HashPartitioner by default
  }
    /** * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
  def groupByKey(partitioner: Partitioner) :RDD[(K.可迭代[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    // Feng: Map side combine is useless and may waste twice as much space
    //CompactBuffer is similar to but more efficient than ArrayBuffer, except that it takes the values of the first two elements and stores them separately. This optimization is useful for short arrays
    val createCombiner = (v: V) = >CompactBuffer(v) 
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K.可迭代[V]]]]}/** * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C's into a single one. */
  @Experimental
  def combineByKeyWithClassTag[C](
      createCombiner: V= >C,
      mergeValue: (C.V) = >C,
      mergeCombiners: (C.C) = >C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null) (implicit ct: ClassTag[C) :RDD[(K.C)] = self.withScope { require(mergeCombiners ! =null."mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")}if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")}}// Closure processing
    val aggregator = new Aggregator[K.V.C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    // If the parent partition function is the same
    if (self.partitioner == Some(partitioner)) {
      MapPartitions. Generate MapPartitionsRDD
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)}else { // The partitioner is ShuffledRDD instead of shuffle
      new ShuffledRDD[K.V.C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }
Copy the code

The data dependencies of groupByKey were introduced in the previous chapter, so this is a refresher.

GroupByKey () simply aggregates records with the same Key together in a simple shuffle process. Compute () in ShuffledRDD is only responsible for fetching data belonging to each partition, After that, aggregate is performed using the mapPartitions() operation (shown in the OneToOneDependency earlier) to generate MapPartitionsRDD, where groupByKey() is finished. Finally, in order to unify the return value interface, the ArrayBuffer[] data structure in value is abstracted into Iterable[].

GroupByKey () does not combine on the map side, because the map side combine only saves space occupied by duplicate keys in partitions. When duplicate keys are too many, you can consider enabling Combine.

The ArrayBuffer here should actually be a CompactBuffer, An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.

In Spark 2.1.1, groupByKey does not enable mapSideCombine. If you set groupByKey to mapSideCombine = false, the result is MapPartitionsRDD.

ParallelCollectionRDD is the most basic RDD. RDD created directly from the local data structure is of this type, for example

val pairs = sc.parallelize(List(1.2.3.4.5), 3)
Copy the code

The resulting Pairs are ParallelCollectionRDD.

  /** SparkContext.scala * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call *  to parallelize and before the first action on the RDD, the resultant RDD will reflect the * modified collection. Pass a copy of the argument to avoid this. * For lazy execution,seq had better be immutable, otherwise the parallelize value will change */ if seQ is changed before execution
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T] (this, seq, numSlices, Map[Int.Seq[String]]())
  }
Copy the code

2) reduceByKey(func, numPartitions)

  /** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. */
  def reduceByKey(partitioner: Partitioner, func: (V.V) = >V) :RDD[(K.V)] = self.withScope {
    // Default mapSideCombine: Boolean = true
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
Copy the code

ReduceByKey () is equivalent to traditional MapReduce, and the whole data flow is basically the same as the data flow in Hadoop. ReduceByKey () turns combine() on the map side by default, so combine mapPartitions operation before shuffle to get MapPartitionsRDD, ShuffledRDD is obtained by shuffling, and MapPartitionsRDD is obtained by reducing (implemented by aggregate + mapPartitions()).

Like groupByKey,partitioner can also generate MapPartitionsRDD, not necessarily ShuffledRDD

3) distinct(numPartitions)

 def distinct() :RDD[T] = withScope {
    distinct(partitions.length)
  }
  /** * Return a new RDD containing the distinct elements in this RDD. * feng: Distinct directly uses reduceByKey */
  def distinct(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x,  ).map(_._1)
  }
Copy the code

The distinct() function is all duplicate data in deDuplicate RDD. Duplicate data may be scattered among different partitions. Therefore, shuffle is required for aggregate. However, shuffle requires that the data type be

. If the original data is only Key (for example, record is only an integer), it needs to be supplemented with

. This complementary process is done by the map() operation, which produces MappedRDD. Then call the above reduceByKey() to shuffle, combine on the Map side, and further reduce to generate MapPartitionsRDD. Finally, restore

to K, again done by map(), producing MappedRDD. The blue part is the call reduceByKey().
,>
,>
,>

4) cogroup(otherRDD, numPartitions)

  def cogroup[W](
      other: RDD[(K.W)],
      numPartitions: Int) :RDD[(K, (可迭代[V].可迭代[W]))] = self.withScope {
    / / the default HashPartitioner
    cogroup(other, new HashPartitioner(numPartitions))
  }
  def cogroup[W](other: RDD[(K.W)], partitioner: Partitioner)
      : RDD[(K, (可迭代[V].可迭代[W]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("HashPartitioner cannot partition array keys.")}val cg = new CoGroupedRDD[K] (Seq(self, other), partitioner)
    //MapPartitionsRDD, convert CG value to Iterable
    cg.mapValues { case Array(vs, w1s) =>
      (vs.asInstanceOf[可迭代[V]], w1s.asInstanceOf[可迭代[W]])}}Copy the code

Unlike groupByKey(), cogroup() aggregates two or more RDDS. Should CoGroupedRDD be ShuffleDependency to RDD A and RDD B? Is there OneToOneDependency?

First of all, it should be clear that cogroup DRDD has several partitions that can be set by users directly, independent of RDD A and RDD B. However, if the number of partitions in cogroup DRDD is not the same as the number of partitions in RDD A/B, then a 1:1 relationship cannot exist.

Again, which partition the calculation of cogroup() is placed in the CoGroupedRDD is determined by the user-set partitioner (HashPartitioner by default). It follows that even if the number of partitions in RDD A/B is the same as that in CoGroupedRDD, if the Partitioner in RDD A/B is different from that in CoGroupedRDD, there cannot be a 1:1 relationship. For example, in the example above, RDD A is a RangePartitioner, B is a HashPartitioner, and CoGroupedRDD is a RangePartitioner with the same number of partitions as A. Naturally, records in each partition in A can be sent directly to the corresponding partition in cogroup DRDD. Records in RDD B must be divided and shuffled again before entering the corresponding partition.

Finally, according to the above analysis, for two or more RDD aggregations, a 1:1 relationship will be formed with the preceding RDD only when the number of Partitioner categories and partitions in the aggregated RDD are the same as those in the preceding RDD. Otherwise, can only be ShuffleDependency. This algorithm. The corresponding code can be in CoGroupedRDD method getDependencies () found, although they are hard to understand.

/*CoGroupedRDD.scala*/
  override def getDependencies: Seq[Dependency[_]] = {
    rdds.map { rdd: RDD[_] = >// In Scala, if the object being compared is null then equals is called
      // Compares the partitioner of the current RDD to the aggregated partitioner. If it is a hash partitioner,equals compares the type and number of partitions
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K.Any.CoGroupCombiner](
          rdd.asInstanceOf[RDD[_ < :Product2[K, _]]], part, serializer)
      }
    }
  }
    // How many partitions are in the RDD and how to serialize each partition
  override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](part.numPartitions)
    for (i <- 0 until array.length) {
      // Each CoGroupPartition will have a dependency per contributing RDD
      array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
        // Assume each RDD contributed a single dependency, and get it
        dependencies(j) match {
          case s: ShuffleDependency[_, _, _] = >None
          case_ = >Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
        }
      }.toArray)
    }
    array
  } 
Copy the code

How does the Spark code indicate that partitions in CoGroupedRDD depend on partitions in multiple parent RDDs?

First, put all the RDDS that cogroup DRDD depends on into the array RDDS [RDD]. Foreach I, if the RDD corresponding to CoGroupedRDD and RDDS (I) is OneToOneDependency, then Dependecy[I] = new OneToOneDependency(RDD), Otherwise = new ShuffleDependency(RDD). Finally, the Dependency array DEps [Dependency] is returned for each parent RDD.

The Dependency class getParents(Partition ID) is responsible for giving partitions to a partition according to the parent RDD on which the Dependency depends: List[Int].

GetPartitions () is responsible for giving you how many partitions are in the RDD and how each partition is serialized.

5) intersection(otherRDD)

  def intersection(other: RDD[T) :RDD[T] = withScope {
    //MapPartitionsRDD => CoGroupedRDD
    // Map is the first map for both RDD's
    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
        // => MapPartitionsRDD
        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
        .keys   // => MapPartitionsRDD
  }
Copy the code

The intersection() function extracts the common data in RDD A and RDD B. Map () is used to convert RDD[T] to RDD[(T, null)], as long as T is not Array. Next, go to a.cogroup(b), the blue part is the same as the previous cogroup(). Filter () is then used to filter out records whose groupA or groupB is empty in [iter(groupA()), iter(groupB())] to obtain FilteredRDD. Finally, use keys() to keep only the key to get MappedRDD.

  1. join(otherRDD, numPartitions)

  /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */
  def join[W](other: RDD[(K.W)], partitioner: Partitioner) :RDD[(K, (V.W))] = self.withScope {
    / / flatMapValues, not flatMap
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) 
      // do the same with the following
// while(pair._1.iterator.hasNext){
// v <- pair._1.iterator.next
// while(pair._2.iterator.hasNext){
// w <- pair._2.iterator.next
// yield (v, w)
/ /}
/ /})}Copy the code

Join () aggregates two RDD[(K, V)] in the same way as JOIN in SQL. Like intersection(), we first cogroup() to get MappedValuesRDD of type

. We then do the Cartesian set of Iterable[V1] and Iterable[V2], and flat() the set.
,>

RDD 1 and RDD 2 are partitioned using RangePartitioner. CoGroupedRDD uses HashPartitioner, which is different from RDD 1/2. So ShuffleDependency. In the second example, RDD 1 uses HashPartitioner to partition its key beforehand, resulting in three partitions, the same as the HashPartitioner(3) used by CoGroupedRDD, So the data dependency is 1:1. If RDD 2 also uses HashPartitioner to partition its key beforehand, resulting in three partitions, then join() does not have a ShuffleDependency, and the join() becomes HashJoin ().

7) sortByKey(ascending, numPartitions)

  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K.V)] = self.withScope{
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K.V.V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }
Copy the code

SortByKey () sorts records in RDD[(K, V)] by key. Ascending = true indicates ascending and false indicates descending. The data dependency of sortByKey() is very simple. Shuffle is used to gather records together (into the corresponding partition), and then sort all records in the partition by key. The records in the resulting MapPartitionsRDD are ordered.

Currently sortByKey() uses Array to hold all records in the partition and then sorts them.

8) cartesian(otherRDD)

/*RDD.scala*/
  def cartesian[U: ClassTag](other: RDD[U) :RDD[(T.U)] = withScope {
    new CartesianRDD(sc, this, other)
  }
Copy the code

Cartesian makes Cartesian set of two RDD, and the number of partitions in the generated CartesianRDD = partitionNum(RDD A) * partitionNum(RDD B).

Each partition in the CartesianRDD is dependent on two parent RDD, and each partition is completely dependent on one partition in RDD a. At the same time, it completely depends on another partition in the RDD B. There are no red arrows because all dependencies are NarrowDependency.

CartesianRDD. Method getDependencies () returns RDDS [RDD a, RDD b]. The partiton I in CartesianRDD depends on (RDD a).list (I/numPartitionsInRDDb) and (RDD b).list (I % numPartitionsInRDDb).

  override def getPartitions: Array[Partition] = {
    // create the cross product split
    // Feng :Cartesian does a Cartesian set of two RDD's
    PartitionNum (RDD a) * partitionNum(RDD b)
    val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
      val idx = s1.index * numPartitionsInRdd2 + s2.index
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    }
    array
  }
  
 override def getDependencies: Seq[Dependency[_]] = List(
    new NarrowDependency(rdd1) {
      def getParents(id: Int) :Seq[Int] = List(id / numPartitionsInRdd2)
    },
    new NarrowDependency(rdd2) {
      def getParents(id: Int) :Seq[Int] = List(id % numPartitionsInRdd2)
    }
  )
Copy the code

9) coalesce(numPartitions, shuffle = false)

/** rdd. scala * When shuffle: Boolean = false, increasing the number of partitions does not take effect */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0.s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) = > {// In each partition, the key of the first element
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1 / / key
          (position, t)
        }
      } : Iterator[(Int.T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int.T.T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      // CoalescedRDD is generated without shuffle
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }
Copy the code

Coalesce () adjusts the number of parent RDD partitions, for example, from 5 to 3 or from 5 to 10. Note that when shuffle is set to false, the number of partitions cannot be increased from 5 to 10.

The core problem of coalesce() is how to establish the relationship between the partitions in CoalescedRDD and the partitions in its parent RDD.

  • Coalesce (shuffle = false) Indicates that shuffle cannot be performed.The question becomes which partitions in the parent RDD can be merged together. In addition to the number of elements in a partition, locality and balance should also be considered. Therefore, Spark designed a very complex algorithm to solve this problem (the algorithm part I haven’t delved into yet). Pay attention toExample: a.coalesce(3, shuffle = false)Shows NarrowDependency for N:1.
  • When coalesce(shuffle = true) is used, ** can be shuffled. The problem becomes how to evenly divide all records in the RDD into N partitions. ** Very simple, in each partition, attach a key to each record, the key increment, so that after the hash(key), the key can be evenly allocated to different partitions, similar to the round-robin algorithm. In the second example, each element in RDD A is first incremented by a key (e.g. 1 in the second partition (1, 3) of MapPartitionsRDD). In each partition, the Key in the first element (Key, Value) is specified byvar position = (new Random(index)).nextInt(numPartitions); position = position + 1Index is the index of the partition, and numPartitions are the number of partitions in CoalescedRDD. Next, the key of the element is increasing, and ShuffledRDD after shuffle can obtain evenly divided records. Then, the data connection between ShuffledRDD and CoalescedRDD is established through a complex algorithm, and the key is filtered. The coalesce result is MappedRDD.

10) repartition(numPartitions)

Equivalent to coalesce(numPartitions, shuffle = true)

   /* * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * Which can avoid performing a shuffle. * feng: Reduce the number of partitions to be used in coalesce instead of shuffle */
  def repartition(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)}Copy the code

Primitive transformation()

combineByKey()

Having analyzed so many logical execution diagrams for RDD, do they have anything in common? If so, how was it designed and implemented?

After careful analysis of the logical execution diagram of THE RDD, it can be found that the RECORD requirement in the RDD on the left of ShuffleDependency is of type

. After ShuffleDependency, Records that contain the same key can be aggregated, and aggregated different calculation logic can be performed on aggregated records. In practice (more on this later), many transformations () such as groupByKey() and reduceByKey() perform calculation logic while aggregating data, So what we have in common is aggregate simultaneous compute(). Spark uses combineByKey() to implement the aggregate + compute() base operation.
,>

CombineByKey () is defined as follows:

  def combineByKey[C](
      createCombiner: V= >C,
      mergeValue: (C.V) = >C,
      mergeCombiners: (C.C) = >C) :RDD[(K.C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)}Copy the code

There are three main parameters createCombiner, mergeValue and mergeCombiners. A brief explanation of these three functions and the meaning of combineByKey(), note their types:

Assuming that a set of <K, V> records with the same K is flowing one by one to combineByKey(), createCombiner initializes the value of the first record to C (for example, c = value), Then, starting with the second record, update C with mergeValue(c, record.value) for each record. For example, if you want to sum all values of the records, Use c = c + record.value. When all records are mergeValue(), result C is obtained. Assuming another set of records (with the same key as the previous set) arrive one by one, combineByKey() computs c’ using the previous method. Now if you want the result of the total combineByKey() of the two sets of records, you can calculate it using final C = mergeCombiners(c, c’).

Discussion

So far, we’ve discussed how to generate logical execution diagrams for jobs, which are the complex computational logic and data dependencies behind Spark’s deceptively simple API.

The RDD generated by the entire job is determined by the Transformation () semantics. Some transformations (), such as cogroup(), are used by many other operations.

The dependencies of the RDD itself are determined by the semantics of each RDD generated by Transformation (). For example, cogroup DRDD depends on all RDDs participating in cogroup().

Partition dependencies in RDD are classified into NarrowDependency and ShuffleDependency. The former is a complete dependence, the latter is a partial dependence. NarrowDependency can be a range of NarrowDependency, and it only happens if the two RDD’s have the same number of partitions and partitioners.

From the perspective of data processing logic, MapReduce is equivalent to Map () + reduceByKey() in Spark, but strictly speaking, Reduce () in MapReduce is more powerful than reduceByKey(). The differences are discussed in the Shuffle Details chapter.

The source code

Dependency

@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]}/** * :: DeveloperApi :: * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the Narrow dependencies allow for pipelined execution. * * Each child RDD partition depends on a small group of parent RDD partitions This is a NarrowDependency; it is a NarrowDependency, but it is a shuffle */
@DeveloperApi
abstract class NarrowDependency[T] (_rdd: RDD[T]) extends Dependency[T] {
  /** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */
  def getParents(partitionId: Int) :Seq[Int]

  override def rdd: RDD[T] = _rdd
}

/** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don't need it on the executor side. * * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set * explicitly then the default serializer, as specified by `spark.serializer` * config option, will be used. * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) */
@DeveloperApi
class ShuffleDependency[K: ClassTag.V: ClassTag.C: ClassTag] (
    @transient private val _rdd: RDD[_ < :Product2[K.V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K.V.C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K.V]] {

  override def rdd: RDD[Product2[K.V]] = _rdd.asInstanceOf[RDD[Product2[K.V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))}/** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. * Feng: Parent and child partitions are one-to-one in RDD. Filter * partitionId is the sequence number of partitions in RDD. The partition numbers in parent and child should be the same */
@DeveloperApi
class OneToOneDependency[T] (rdd: RDD[T]) extends NarrowDependency[T] (rdd) {
  override def getParents(partitionId: Int) :List[Int] = List(partitionId)
}

/** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range * Parent partitions in the RDD correspond to partitions in the child RDD * Eg: Currently only used in union, multiple parent RDD are merged into one child RDD, so each parent RDD corresponds to an interval */ in the child RDD
@DeveloperApi
class RangeDependency[T] (rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int) :List[Int] = {
    // To judge the rationality of the partitionId, it must be within the reasonable partition of the child RDD
    if (partitionId >= outStart && partitionId < outStart + length) {
      // Calculate the partition ID of the parent RDD
      List(partitionId - outStart + inStart)
    } else {
      Nil}}}Copy the code