Personal homepage zicesun.com

This section summarizes the usage of the Spark RDD operator from the perspective of source code.

Single-valued Transformation operator

map

  /** * Return a new RDD by applying a function to all elements of this RDD. */
  def map[U: ClassTag](f: T= >U) :RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U.T] (this, (context, pid, iter) => iter.map(cleanF))
  }
Copy the code

There is a sc.clean() function in the source code that removes external reference variables from closures that cannot be sequenced. Scala supports closures, which keep references to external objects inside themselves so that the closure can be used independently without worrying about leaving its current scope; However, in a distributed environment like Spark, this can cause problems. If the external reference is non-serializable, it cannot be sent to the worker node correctly. There are also some references that may not be used at all. These unused references do not need to be sent to worker. The sc.clean function actually calls closurecleaner.clean (); Closurecleaner.clean () checks unserializable and removes unused references by recursively iterating through the closure.

The map function is a coarse-grained operation that, for an RDD, iterates over the partition, then uses the operation f you want to perform for one partition, and then returns a new RDD. In essence, every element of the RDD performs the same operation.

scala>Val array = array (6)
array: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> val rdd = sc.app
appName   applicationAttemptId   applicationId

scala> val rdd = sc.parallelize(array, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val mapRdd = rdd.map(x => x * 2)  
mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> mapRdd.collect().foreach(println)2, 4, 6, 8, 10, 12Copy the code

flatMap

The flatMap method is similar to the Map method, but allows multiple objects to be output in a map method at once, rather than one object in the map being converted by a function to another.

  /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */
  def flatMap[U: ClassTag](f: T= >TraversableOnce[U) :RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U.T] (this, (context, pid, iter) => iter.flatMap(cleanF))
  }
Copy the code
scala> val a = sc.parallelize(1 to 10, 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> a.flatMap(num => 1 to num).collect
res1: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Copy the code

mapPartitions

MapPartitions are another implementation of Map. The input function of MAP applies to each element of the RDD, but the input function of mapPartitions applies to each partition, i.e. the contents of each partition as a whole.

 /** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] = >Iterator[U],
      preservesPartitioning: Boolean = false) :RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }
Copy the code
scala> def myfunc[T](iter: Iterator[T]):Iterator[(T,T)]={ 
     | var res = List[(T,T)]()
     | var pre = iter.next
     | while(iter.hasNext){
     |   var cur = iter.next
     |   res .::= (pre, cur)
     |   pre = cur
     | }
     | res.iterator
     | }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]

scala>Val a = sc.parallelize(1 to 9,3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> a.mapPartitions
mapPartitions   mapPartitionsWithIndex

scala> a.mapPartitions(myfunc).collectRes0: Array [(Int, Int)] = Array ((2, 3), (1, 2), (5, 6), (4, 5), (8, 9), (7, 8))Copy the code

mapPartitionWithIndex

The mapPartitionWithIndex method is similar to the mapPartitions method, except that mapPartitionWithIndex traces the index of the original partition, so that the corresponding element of the partition can be known. The parameter of the method is a function. The input to the function is an integer index and an iterator.

  /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */
  def mapPartitionsWithIndex[U: ClassTag](
      f: (Int.Iterator[T]) = >Iterator[U],
      preservesPartitioning: Boolean = false) :RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
      preservesPartitioning)
  }
Copy the code
scala> val x = sc.parallelize(1 to 10, 3)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> def myFunc(index:Int, iter:Iterator[Int]):Iterator[String]={
     |   iter.toList.map(x => index + "," + x).iterator
     | }
myFunc: (index: Int, iter: Iterator[Int])Iterator[String]

scala> x.mapPartitions
mapPartitions   mapPartitionsWithIndex

scala> x.mapPartitionsWithIndex(myFunc).collect[String] res1: Array = Array (0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9, 2, 10)Copy the code

foreach

Foreach loops over each input data object and can be used to perform output operations on RDD elements.

  /** * Applies a function f to all elements of this RDD. */
  def foreach(f: T= >Unit) :Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }
Copy the code
scala> var x = sc.parallelize(List(1 to 9), 3)
x: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> x.foreach(print)
Range(1, 2, 3, 4, 5, 6, 7, 8, 9)
Copy the code

foreachPartition

The foreachPartition method does the same thing as mapPartition: it applies a function to the data object of each partition in the RDD using iterator arguments. The difference is that the arguments used have a return value.

  /** * Applies a function f to each partition of this RDD. */
  def foreachPartition(f: Iterator[T] = >Unit) :Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }
Copy the code
scala>Val b = sc.parallelize(List(1,2,3,4,5,6), 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> b.foreachPartition(x => println(x.reduce((a,b) => a +b)))July 3, 11Copy the code

glom

The function of Glom is similar to that of Collec. Collect directly converts RDD into the form of array, while GLOm assembles RDD partition data into RDD of array type. Each returned array contains all elements of a partition and is converted into an array by partition.

  /** * Return an RDD created by coalescing all elements within each partition into an array. */
   
  def glom() :RDD[Array[T]] = withScope {
    new MapPartitionsRDD[Array[T].T] (this, (context, pid, iter) => Iterator(iter.toArray))
  }
Copy the code

In the following example, RDD A has three partitions, and Glom converts A into an RDD composed of three arrays.

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> a.glom.collect
res5: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))

scala> a.glom
res6: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[4] at glom at <console>:26
Copy the code

union

The union method is equivalent to the ++ method. The union of the two RDD’s will not be duplicated in the process of union.

  /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */
  def union(other: RDD[T) :RDD[T] = withScope {
    sc.union(this, other)
  }

  /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */
  def ++(other: RDD[T) :RDD[T] = withScope {
    this.union(other)
  }
Copy the code
scala>Val a = sc.parallelize(1 to 4,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala>Val b = sc.parallelize(2 to 5,1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> a.un
union   unpersist

scala> a.union(b).collect
res7: Array[Int] = Array(1, 2, 3, 4, 2, 3, 4, 5)

Copy the code

cartesian

Computes the Cartesian product of each object in the two RDD’s

  /** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */
  def cartesian[U: ClassTag](other: RDD[U) :RDD[(T.U)] = withScope {
    new CartesianRDD(sc, this, other)
  }

Copy the code
cala>Val a = sc.parallelize(1 to 4,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala>Val b = sc.parallelize(2 to 5,1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> a.cartesian(b).collectres8: Array [(Int, Int)] = Array ((1, 2), (1, 3), (1, 4), (1, 5), (2, 2), (2, 3), (2, 4), (2, 5), (3, 2), (3, 3), (3, 4), (3, 5), (4, 2), (4, 3), (4), (4, 5))Copy the code

groupBy

The groupBy method has three overloaded methods, which use the map function to generate key-values for elements, and then use the groupByKey method to aggregate key-values.

/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The  ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
  def groupBy[K](f: T= >K) (implicit kt: ClassTag[K) :RDD[(K.可迭代[T])] = withScope {
    groupBy[K](f, defaultPartitioner(this))}/** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
  def groupBy[K](
      f: T= >K,
      numPartitions: Int) (implicit kt: ClassTag[K) :RDD[(K.可迭代[T])] = withScope {
    groupBy(f, new HashPartitioner(numPartitions))
  }

  /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The  ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
  def groupBy[K](f: T= >K, p: Partitioner) (implicit kt: ClassTag[K], ord: Ordering[K] = null)
      : RDD[(K.可迭代[T])] = withScope {
    val cleanF = sc.clean(f)
    this.map(t => (cleanF(t), t)).groupByKey(p)
  }

Copy the code
scala>Val a = sc.parallelize(1 to 9,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> a.groupBy(x => {if(x % 2 == 0) "even" else "odd"}).collect
res9: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))

scala> def myfunc(a: Int):Int={
     | a % 2
     | }
myfunc: (a: Int)Int

scala> a.groupBy(myfunc).collect
res10: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))

scala> a.groupBy(myfunc(_), 1).collect
res11: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))


Copy the code

filter

The filter method filters the input element. The parameter is a function that returns Boolean. If the function evaluates to true, the element is passed; otherwise, the element is filtered out of the result set.

  /** * Return a new RDD containing only the elements that satisfy a predicate. */
  def filter(f: T= >Boolean) :RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T.T] (this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)}Copy the code
scala> val a = sc.parallelize(List("we"."are"."from"."China"."not"."from"."America"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> val b = a.filter(x => x.length >= 4)
b: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at filter at <console>:25

scala> b.collect.foreach(println)
from
China
from
America

Copy the code

distinct

The distinct method removes duplicate elements from an RDD, leaving only unique RDD elements.

  /** * Return a new RDD containing the distinct elements in this RDD. */
  def distinct(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }

Copy the code
scala> val a = sc.parallelize(List("we"."are"."from"."China"."not"."from"."America"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> val b = a.map(x => x.length)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:25

scala> val c = b.distinct
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at distinct at <console>:25

scala> c.foreach(println)5, 4, 2, 3, 7Copy the code

subtract

Subtract the method is to find sets A-B by removing all elements from set A that contain set B and returning the remaining elements.

 /** * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be < = us. */
  def subtract(other: RDD[T) :RDD[T] = withScope {
    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
  }

  /** * Return an RDD with the elements from `this` that are not in `other`. */
  def subtract(other: RDD[T], numPartitions: Int) :RDD[T] = withScope {
    subtract(other, new HashPartitioner(numPartitions))
  }

  /** * Return an RDD with the elements from `this` that are not in `other`. */
  def subtract(
      other: RDD[T],
      p: Partitioner) (implicit ord: Ordering[T] = null) :RDD[T] = withScope {
    if (partitioner == Some(p)) {
      // Our partitioner knows how to handle T (which, since we have a partitioner, is
      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
      val p2 = new Partitioner() {
        override def numPartitions: Int = p.numPartitions
        override def getPartition(k: Any) :Int = p.getPartition(k.asInstanceOf[(Any, _)] _1)}// Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
      // anyway, and when calling .keys, will not have a partitioner set, even though
      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
      // partitioned by the right/real keys (e.g. p).
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
    } else {
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
    }
  }

Copy the code
scala> val a = sc.parallelize(1 to 9, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> val b = sc.parallelize(2 to 5, 4)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val c = a.subtract(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at subtract at <console>:27

scala> c.collect
res14: Array[Int] = Array(6, 8, 1, 7, 9)

Copy the code

Persist in the cache

Cache, caches data, and caches the RDD in memory so that the calculation can be called again the next time. Persist is to persist an RDD according to different levels. The persistence level is specified by parameter. If no parameter is configured, the RDD is stored only in memory, which is equivalent to Cache.

sample

The purpose of the sample method is to sample the elements in the RDD or get a new child RDD. According to the parameters set whether to put back sampling, subset percentage of the total and random seeds.

 /** * Return a sampled subset of this RDD. * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD's size * without replacement: probability that each element is chosen; fraction must be [0, 1] * with replacement: expected number of times each element is chosen; fraction must be greater * than or equal to 0 * @param seed seed for the random number generator * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[RDD]]. */
  def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] = {
    require(fraction >= 0.s"Fraction must be nonnegative, but got ${fraction}")

    withScope {
      require(fraction >= 0.0."Negative fraction value: " + fraction)
      if (withReplacement) {
        new PartitionwiseSampledRDD[T.T] (this.new PoissonSampler[T](fraction), true, seed)
      } else {
        new PartitionwiseSampledRDD[T.T] (this.new BernoulliSampler[T](fraction), true, seed)
      }
    }
  }

Copy the code
scala> val a = sc.parallelize(1 to 100, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24

scala> val b = a.sample(false, 0.2, 0)
b: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[32] at sample at <console>:25

scala> b.foreach(println)
5
19
20
26
27
29
30
57
40
61
45
68
73
50
75
79
81
85
89
99

Copy the code

Key-value transformation operator

groupByKey

Similar to groupBy, which aggregates the values of each of the same keys into a sequence, you can use either a default partition or a custom partition.

  /** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * The ordering of elements within each group is not guaranteed, and may even differ * each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. * * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
  def groupByKey(partitioner: Partitioner) :RDD[(K.可迭代[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) = >CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K.可迭代[V]]]]}/** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. The ordering of elements within * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are  grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. * * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
  def groupByKey(numPartitions: Int) :RDD[(K.可迭代[V])] = self.withScope {
    groupByKey(new HashPartitioner(numPartitions))
  }

Copy the code
scala> val a = sc.parallelize(List("mk"."zq"."xwc"."fig"."dcp"."snn"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> val b = a.keyBy(x => x.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[34] at keyBy at <console>:25

scala> b.groupByKey.collect
res17: Array[(Int, Iterable[String])] = Array((2,CompactBuffer(mk, zq)), (3,CompactBuffer(xwc, fig, dcp, snn)))


Copy the code

combineByKey

ComineByKey method can effectively combine the values of the same Key in the key-value pair RDD into a sequence. Users can customize the partition of the RDD and whether to perform aggregation operations on the Map side.

  /** * Generic function to combine the elements for each key using a custom set of aggregation * functions. This method is here for backward compatibility. It does not provide combiner * classtag information to the shuffle. * * @see `combineByKeyWithClassTag` */
  def combineByKey[C](
      createCombiner: V= >C,
      mergeValue: (C.V) = >C,
      mergeCombiners: (C.C) = >C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null) :RDD[(K.C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
      partitioner, mapSideCombine, serializer)(null)}/** * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD. * This method is here for backward compatibility. It does not provide combiner * classtag information to the shuffle. * * @see `combineByKeyWithClassTag` */
  def combineByKey[C](
      createCombiner: V= >C,
      mergeValue: (C.V) = >C,
      mergeCombiners: (C.C) = >C,
      numPartitions: Int) :RDD[(K.C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)}Copy the code
scala> val a = sc.parallelize(List("xwc"."fig"."wc"."dcp"."zq"."znn"."mk"."zl"."hk"."lp"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[36] at parallelize at <console>:24

scala>Val b = sc. Parallelize (List (1,2,2,3,2,1,2,2,2,3), 2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[38] at zip at <console>:27


scala> val d = c.combineByKey(List(_), (x:List[String], y:String)=>y::x, (x:List[String], y:List[String])=>x::: y)
d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[39] at combineByKey at <console>:25

scala> d.collect
res18: Array[(Int, List[String])] = Array((2,List(zq, wc, fig, hk, zl, mk)), (1,List(xwc, znn)), (3,List(dcp, lp)))


Copy the code

The above example uses a three-argument overloaded method. The first argument to the createCombiner method converts element V to another class of element C. The argument used in this example is List(_), indicating that the input element is placed in the List collection. MergeValue =>y::x mergeValue =>y::x mergeValue =>y::x mergeValue =>y::x mergeValue =>y::x (x:List[String], y:List[String]) =>x:::y =>x:::y

reduceByKey

A Reduce function is used to aggregate the value of the desired Key. The merge operation is performed locally on the Map side before sending the Key to reduce. The underlying implementation of this method is an overloaded method that calls the combineByKey method.

 /** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. */
  def reduceByKey(partitioner: Partitioner, func: (V.V) = >V) :RDD[(K.V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

  /** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */
  def reduceByKey(func: (V.V) = >V, numPartitions: Int) :RDD[(K.V)] = self.withScope {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

  /** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */
  def reduceByKey(func: (V.V) = >V) :RDD[(K.V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

Copy the code
scala> val a = sc.parallelize(List("dcp"."fjg"."snn"."wc"."za"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val b = a.map(x => (x.length,x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[2] at map at <console>:25

scala> b.reduceByKey((a, b) => a + b ).collect
res1: Array[(Int, String)] = Array((2,wcza), (3,dcpfjgsnn))

Copy the code

sortByKey

Sort key-value pairs by Key, lexicographical if characters, or numeric size if arrays. You can specify ascending or descending order by argument.

scala> val a = sc.parallelize(List("dcp"."fjg"."snn"."wc"."za"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val b = sc.parallelize(1 to a.count.toInt,2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:26

scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[6] at zip at <console>:27

scala> c.sortByKey(true).collect
res2: Array[(String, Int)] = Array((dcp,1), (fjg,2), (snn,3), (wc,4), (za,5))

Copy the code

cogroup

Scala > val a = sc.parallelize(List(1,2,2,3,1,3),2) org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val b = a.map(x => (x,"b"))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[11] at map at <console>:25

scala> val c = a.map(x => (x, "c")) c: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[12] at map at <console>:25 scala> b.cogroup(c).collect res3: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2,(CompactBuffer(b, b),CompactBuffer(c, c))), (1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b, b),CompactBuffer(c, C)))) scala > val a = sc. Parallelize (List (1,2,2,2,1,3), 1) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24 scala> val b = a.map(x => (x,"b"))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[16] at map at <console>:25

scala> val c = a.map(x => (x, "c"))
c: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[17] at map at <console>:25

scala> b.cogroup(c).collect
res4: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(b, b, b),CompactBuffer(c, c, c))))


Copy the code

join

Cogroup operation is performed on the RDD first, then cartesian product operation is performed on the Key value under each new RDD, and the result is returned using flatmapValue method.

scala> val a= sc.parallelize(List("fjg"."wc"."xwc"),2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:24 

scala> val c = sc.parallelize(List("fig"."wc"."sbb"."zq"."xwc"."dcp"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:24

scala> val d = c.keyBy(_.length)
d: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[23] at keyBy at <console>:25

scala> val b = a.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at keyBy at <console>:25

scala> b.join(d).collect
res6: Array[(Int, (String, String))] = Array((2,(wc,wc)), (2,(wc,zq)), (3,(fjg,fig)), (3,(fjg,sbb)), (3,(fjg,xwc)), (3,(fjg,dcp)), (3,(xwc,fig)), (3,(xwc,sbb)), (3,(xwc,xwc)), (3,(xwc,dcp)))

Copy the code

The Action of operator

collect

Returns the elements in the RDD as arrays.

  /** * Return an array that contains all of the elements in this RDD. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */
  def collect() :Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

Copy the code
scala> val a = sc.parallelize(List("a"."b"."c"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> a.collect
res7: Array[String] = Array(a, b, c)


Copy the code

reduce

Use a function that takes two arguments to aggregate elements and return the result of one element. The binary operations in this function should satisfy commutative and associative laws so as to obtain correct results in parallel computation.

  /** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */
  def reduce(f: (T.T) = >T) :T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] = >Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None}}var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) = > {if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}Copy the code
scala> val a = sc.parallelize(1 to 10, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24

scala> a.reduce((a, b) => a + b)
res8: Int = 55

Copy the code

take

The take method takes the first n elements from the RDD. First scan a partition, then get the results from the partition, then evaluate whether the elements of the partition meet N, if not, continue to scan from other partitions to obtain.

/** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit.  * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @note Due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */
  def take(num: Int) :Array[T] = withScope {
    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor".4), 2)
    if (num == 0) {
      new Array[T] (0)}else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        val left = num - buf.size
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.isEmpty) {
            numPartsToTry = partsScanned * scaleUpFactor
          } else {
            // As left > 0, numPartsToTry is always >= 1
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }

        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }
      buf.toArray
    }
  }

Copy the code
scala> val a = sc.parallelize(1 to 10, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at parallelize at <console>:24

scala> a.take(5)
res9: Array[Int] = Array(1, 2, 3, 4, 5)

Copy the code

top

Top uses an implicit sort transformation to get the largest first n elements.

  /** * Returns the top k (largest) elements from this RDD as defined by the specified * implicit Ordering[T] and maintains the ordering. This does the opposite of * [[takeOrdered]]. For example: * {{{ * sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1) * // returns Array(12) * * sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2) * // returns Array(6, 5) * }}} * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @param num k, the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */
  def top(num: Int) (implicit ord: Ordering[T) :Array[T] = withScope {
    takeOrdered(num)(ord.reverse)
  }
  /** * Returns the first k (smallest) elements from this RDD as defined by the specified * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]]. * For example: * {{{ * sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1) * // returns Array(2) * * sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2) * // returns Array(2, 3) * }}} * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @param num k, the number of elements to return * @param ord the implicit ordering for T * @return an array of top elements */
  def takeOrdered(num: Int) (implicit ord: Ordering[T) :Array[T] = withScope {
    if (num == 0) {
      Array.empty
    } else {
      val mapRDDs = mapPartitions { items =>
        // Priority keeps the largest elements, so let's reverse the ordering.
        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
        queue ++= collectionUtils.takeOrdered(items, num)(ord)
        Iterator.single(queue)
      }
      if (mapRDDs.partitions.length == 0) {
        Array.empty
      } else {
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      }
    }
  }

Copy the code
scala>Val c = sc. Parallelize (Array,2,3,5,3,8,7,97,32 (1), 2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24

scala> c.top(3)
res10: Array[Int] = Array(97, 32, 8)

Copy the code

count

The count method counts the number of elements returned in the RDD.

  /** * Return the number of elements in the RDD. */
  def count() :Long = sc.runJob(this.Utils.getIteratorSize _).sum

Copy the code
scala>Val c = sc. Parallelize (Array,2,3,5,3,8,7,97,32 (1), 2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> c.count
res11: Long = 9


Copy the code

takeSample

Returns a fixed – size sample subset of the array, in addition to randomly shuffling the order of returned elements.

 /** * Return a fixed-size sampled subset of this RDD in an array * * @param withReplacement whether sampling is done with replacement * @param num size of the returned sample * @param seed seed for the random number generator * @return sample of specified size in an array * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */
  def takeSample(
      withReplacement: Boolean,
      num: Int,
      seed: Long = Utils.random.nextLong): Array[T] = withScope {
    val numStDev = 10.0

    require(num >= 0."Negative number of elements requested")
    require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
      "Cannot support a sample size > Int.MaxValue - " +
      s"$numStDev * math.sqrt(Int.MaxValue)")

    if (num == 0) {
      new Array[T] (0)}else {
      val initialCount = this.count()
      if (initialCount == 0) {
        new Array[T] (0)}else {
        val rand = new Random(seed)
        if(! withReplacement && num >= initialCount) {Utils.randomizeInPlace(this.collect(), rand)
        } else {
          val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
            withReplacement)
          var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

          // If the first sample didn't turn out large enough, keep trying to take samples;
          // this shouldn't happen often because we use a big multiplier for the initial size
          var numIters = 0
          while (samples.length < num) {
            logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
            samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
            numIters += 1
          }
          Utils.randomizeInPlace(samples, rand).take(num)
        }
      }
    }
  }

Copy the code
scala>Val c = sc. Parallelize (Array,2,3,5,3,8,7,97,32 (1), 2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24

scala> c.takeSample(true, 3, 1)
res14: Array[Int] = Array(1, 3, 7)

Copy the code

saveAsTextFile

Store the RDD as a text file, one line at a time

countByKey

CountByKey is similar to count, but countByKey calculates the number of values based on the Key and returns a result of Map type.

  /** * Count the number of elements for each key, collecting the results to a local Map. * * @note This method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver's memory. * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */
  def countByKey() :Map[K.Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }

Copy the code
scala> val c = sc.parallelize(List("fig"."wc"."sbb"."zq"."xwc"."dcp"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[36] at parallelize at <console>:24

scala> val d = c.keyBy(_.length)
d: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[37] at keyBy at <console>:25

scala> d.countByKey
res15: scala.collection.Map[Int,Long] = Map(2 -> 2, 3 -> 4)


Copy the code

aggregate

  /** * Aggregate the elements of each partition, and then the results for all the partitions, using * given combine functions and a neutral "zero value". This function can return a different result * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. * * @param zeroValue the initial value for the accumulated result of each  partition for the * `seqOp` operator, and also the initial value for the combine results from * different partitions for the `combOp` operator - this will typically be the * neutral element (e.g. `Nil` for list concatenation or `0` for summation) * @param seqOp an operator used to accumulate results within a partition * @param combOp an associative operator used to combine results from different partitions */
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U.T) = >U, combOp: (U.U) = >U) :U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

Copy the code

fold

/** * Aggregate the elements of each partition, and then the results for all the partitions, using a * given associative function and a neutral "zero value". The function * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object * allocation; however, it should not modify t2. * * This behaves somewhat differently from fold operations implemented for non-distributed * collections in functional languages like Scala. This fold operation may be applied to * partitions individually, and then fold those results into the final result, rather than * apply the fold to each element sequentially in some defined ordering. For functions * that are not commutative, the result may differ from that of a fold applied to a * non-distributed collection. * * @param zeroValue the initial value for the accumulated result of each partition for the `op` * operator, and also the initial value for the combine results from different * partitions for the `op` operator - this will typically be the neutral * element (e.g. `Nil` for list concatenation or `0` for summation) * @param op an operator used  to both accumulate results within a partition and combine results * from different partitions */
  def fold(zeroValue: T)(op: (T.T) = >T) :T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

Copy the code