Resilient Distributed Dataset (RDD) is a core concept in Spark. Simply put, you create an RDD in Spark and use it to manipulate the data in a variety of ways.

RDD create

The creation of RDD includes:

  1. Created from a collection
  2. Created from other storage (such as HDFS, local files, etc.)
  3. Created from another RDD

SparkContext is created before the RDD can be created. As you can see from the following example, SparkContext requires a conf. The master property of this conf has been mentioned in the previous two installers. Two methods are created from a collection, Parallelize and MakerDD. MakerDD actually calls Parallelize, both of which allow you to pass parameters to a partition in addition to the collection. From other stores including the local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Files can be wildcards instead of defining a single file, such as textFile(“/my/directory/*.txt”), /my/directory for all TXT files.

object CreateRDD { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("CreateRDD").setMaster("local") val sc = new SparkContext(conf) // parallelize val data = Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) println(rdd1.count) val rdd2: RDD[Int] = sc.makeRDD(data) // makeRDD println(rdd2.count) val rdd3: RDD[String] = sc.textfile ("D:\ data\\test.txt") // println(rdd3.first) // println(rdd3.first) from other RDD val rdd4: RDD[String] = rdd3.map(_ + "BBB ") println(rdd4.first)}

The operation of the RDD

RDD supports two transformations: you can make or subtract actions. Transformations from the literal understanding, is transforming, it can be said that from a data set into another data set, here it is important to note that all of the transformations are lazy, that is to say, though he performed we that line statements, such as map map, he will not be to map map, Only when the action is triggered will the calculation be performed. Action, which triggers the evaluation of RDD and returns the final result to the driver.

Transformations

map

A map is a map; that is, you pass in one value and change it to another value. For example, in the following example, change 1, 2, 3, 4, 5 to 2, 4, 6, 8, 10, pass in 1 to make it 2, pass in 2 to make it 4.

object MapRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("MapRDD").setMaster("local")) val data = Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val mapRdd = rdd1.map(_ * 2) mapRdd. Map (x => x * 2) map(x => x * 2) maprdd2.foreach (println)}

filter

A filter is a filter, you give me a value, I evaluate it and it returns true or false, if it returns false, it’s not in the new set. For example, in the example below, even numbers return true, so 2,4 is printed at the end

object FilterRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("FilterRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data)
    val rdd2: RDD[Int] = rdd1.filter(x => x % 2 == 0)
    rdd2.foreach(println)
  }
}

flatMap

FlatMap is flat, where you give me a value and I give you zero or more values. For example, in the following example, you give me “a b” and I give you the values a and b after I pass split.

object FlatMapRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("FlatMapRDD ").setMaster("local")) val data = Array("a b",  "c d") val rdd1: RDD[String] = sc.parallelize(data) val rdd2: RDD[String] = rdd1.flatMap(_.split(" ")) rdd2.foreach(println) } }

mapPartitions

Similar to Map, it runs separately on each partition (block) of RDD. When so run on RDD of type T, the return value, func, must be of type Iterator

=> Iterator
. For example, in the following example, you give me x and I return x.map(_ * 2).

object MapPartitionsRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("MapPartitionsRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data)
    val mapRdd = rdd1.mapPartitions(x => x.map(_ * 2))
    mapRdd.foreach(println)
  }
}

mapPartitionsWithIndex

Similar to mapPartitions, func is provided with an integer value that represents a partitioned index. In other words, we can pass the index parameter, get the corresponding partition.

object MapPartitionsWithIndexRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("MapPartitionsWithIndexRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data, 2)
    val mapRdd = rdd1.mapPartitionsWithIndex((index, x) => x.map(index + "," + _))
    mapRdd.foreach(println)
  }
}

sample

So if you have a sample, you take 30 of 100 at random. There are three parameters for sample:

  • WithReplacement: An element can be sampled multiple times (replaced during sampling)
  • Fraction: The expected size of the sample is taken as part of the RDD size. When withReplacement is true, Fraction is the expected number of each element, and the value must be greater than 0. When withReplacement is false, fraction is the expected probability of each element, and the value is 0 to 1.
  • Seed: The seed of a random number generator
object SampleRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("SampleRDD").setMaster("local")) val data = 1 to 100 val Rdd1: RDD[Int] = sc.parallelize(data) val rdd2: RDD[Int] = rdd1.sample(true, 0.5, 1) println(rdd2.count)}

union

A UNION is a union, merging two RDDs. For example, in the following example, RDD1 and RDD2 are merged.

object UnionRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("UnionRDD").setMaster("local"))
    val data1 = Array(1, 2, 3, 4, 5)
    val data2 = Array(6, 7, 8, 9, 10)
    val rdd1: RDD[Int] = sc.parallelize(data1)
    val rdd2: RDD[Int] = sc.parallelize(data2)
    val rdd3 = rdd1.union(rdd2)
    rdd3.foreach(println)
  }
}

intersection

Intersection is an intersection that takes two parts of an RDD that are the same. For example, in the following example, the intersection of rdd1 and rdd2 is 3,4,5.

object IntersectionRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("IntersectionRDD").setMaster("local"))
    val data1 = Array(1, 2, 3, 4, 5)
    val data2 = Array(3, 4, 5, 6, 7)
    val rdd1: RDD[Int] = sc.parallelize(data1)
    val rdd2: RDD[Int] = sc.parallelize(data2)
    val rdd3 = rdd1.intersection(rdd2)
    rdd3.foreach(println)
  }
}

distinct

DISTINCT is the ability to filter out duplicates. For example, in the example below, 1,2,3 is repeated, and the final print is 1,2,3, 4, 5.

object DistinctRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("DistinctRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5, 1, 2, 3)
    val rdd1: RDD[Int] = sc.parallelize(data)
    val rdd2: RDD[Int] = rdd1.distinct
    rdd2.foreach(println)
  }
}

groupByKey

A groupByKey is a key group. If there is no key, you need to specify the key by the keyBy method. For example, in the following example, the result of modulo 2 is used as the key and a new RDD is returned. In foreach, you can see that the parameters include key and iterator.

object GroupByKeyRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("GroupByKeyRDD").setMaster("local"))
    val data1 = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data1)
    val rdd2: RDD[(Int, Int)] = rdd1.keyBy(_ % 2)
    rdd2.groupByKey().foreach(x => {
      val iterator = x._2.iterator
      var num = ""
      while (iterator.hasNext) num = num + iterator.next + " "
      println(x._1 + ":" + num)
    })
    val data2 = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
    val rdd3: RDD[(String, Int)] = sc.parallelize(data2)
    rdd3.groupByKey().foreach(x => {
      val iterator = x._2.iterator
      var num = ""
      while (iterator.hasNext) num = num + iterator.next + " "
      println(x._1 + ":" + num)
    })
  }
}

reduceByKey

The reduceByKey is calculated by changing to the same key. In the following example, the final result is (a,9) and (b,6).

object ReduceByKeyRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("ReduceByKeyRDD").setMaster("local"))
    val data = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
    val rdd1: RDD[(String, Int)] = sc.parallelize(data)
    val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_ + _)
    rdd2.foreach(println)
  }
}

aggregateByKey

Aggregate by partition first, then aggregate last. The code is as follows:

object AggregateByKeyRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("AggregateByKeyRDD").setMaster("local"))
    val data = Array(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("a", 5), ("a", 6), ("b", 7), ("b", 8))
    val rdd1: RDD[(String, Int)] = sc.parallelize(data, 2)
    val rdd2: RDD[(String, Int)] = rdd1.aggregateByKey(3)(math.max(_, _), _ + _)
    rdd2.foreach(println)
  }
}

This code is illustrated in the figure below.

  1. Partitioning the collection
  2. Grouping by Key
  3. After grouping, Max is calculated, which is the first function after AggregateByKey above. Here Max has two values, one is the 3 following AggregateByKey, and the other is the value corresponding to the key. For example, the value corresponding to A is 1,2. So it’s going to be a,3.
  4. The results of the two partition calculations are added together, which is the second function after AggregateByKey above. So the final answer is (b,12), (a,9).

sortByKey

If the first argument is true, it will be in ascending order, which is the default value, and if it is false, it will be in descending order.

object SortByKeyRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("SortByKeyRDD").setMaster("local")) val data = Array(("a",  1), ("c", 3), ("d", 4), ("b", 2)) val rdd1: RDD[(String,Int)] = sc.parallelize(data) val rdd2: RDD[(String, Int)] = rdd1.sortByKey() rdd2.foreach(println) } }

join

Similar to MySQL join

object JoinRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("JoinRDD").setMaster("local")) val data1 = Array(("a", 1), ("c", 3), ("d", 4), ("b", 2)) val data2 = Array(("a", 11), ("d", 14), ("e", 25)) val rdd1: RDD[(String, Int)] = sc.parallelize(data1) val rdd2: RDD[(String, Int)] = sc.parallelize(data2) val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2) val rdd4: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2) val rdd5: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2) val rdd6: RDD[(String, (Option[Int], Int))] = rdd1. RightToterJoin (rdd2) // // (d,(4,14)) // (a,(1,11)) println("rdd3:join") rdd3.foreach(println) println("rdd4:fullOuterJoin") The left and right side of the data that does not exist are filled with None // Result:  // (d,(Some(4),Some(14))) // (e,(None,Some(25))) // (a,(Some(1),Some(11))) // (b,(Some(2),None)) // (c,(Some(3),None)) Rdd4.foreach (println) // Left JOIN rdd4.foreach(println)  // (d,(4,Some(14))) // (a,(1,Some(11))) // (b,(2,None)) // (c,(3,None)) println("rdd5:leftOuterJoin") Rdd5.foreach (println) // Right JOIN on MySQL  // (d,(Some(4),14)) // (e,(None,25)) // (a,(Some(1),11)) println("rdd6:rightOuterJoin") rdd6.foreach(println) } }

cogroup

For the KV elements in two RDDs, the elements in the same key in each RDD are separately aggregated into a set. For example, in the following example, for a of RDD1, the result is (1, 3), and for a of RDD2, the result is (23), so the final result is (a,(compactBuffer (1, 3), compactBuffer (23))).

object CogroupRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("CogroupRDD").setMaster("local")) val data1 = Array(("a", 1), ("a", 3),("b", 2)) val data2 = Array(("b", 24), ("a", 23),("b", 22)) val rdd1: RDD[(String,Int)] = sc.parallelize(data1) val rdd2: RDD[(String,Int)] = sc.parallelize(data2) val rdd3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.coGroup (rdd2) //  // (a,(CompactBuffer(1, 3),CompactBuffer(23))) // (b,(CompactBuffer(2),CompactBuffer(24, 22))) rdd3.foreach(println) } }

cartesian

Descartes.

object CartesianRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("CartesianRDD").setMaster("local")) val data1 = Array(1, 2) val data2 = Array(3, 4) val rdd1: RDD[Int] = sc.parallelize(data1) val rdd2: RDD[Int] = sc.parallelize(data2) val rdd3: RDD. [(Int, Int)] = rdd1 cartesian (rdd2) / / / / (1, 4) (1, 3) / / / (2, 3)/(2, 4) rdd3 foreach (println)}}

action

reduce

Each value is evaluated based on the function passed in

object ReduceRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("ReduceRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data)
    val result: Int = rdd1.reduce(_ + _)
    println(result)
  }
}

collect

Get the RDD collection to the driver side, usually when confirmation results are rare.

object CollectRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("CollectRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data)
    val result: Array[Int] = rdd1.collect()
    println(result.mkString(","))
  }
}

count

Number of collections to get RDD.

object CountRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("CountRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data)
    val result: Long = rdd1.count()
    println(result)
  }
}

first

Get the first element of the RDD collection.

object FirstRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("FirstRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data)
    val result: Int = rdd1.first()
    println(result)
  }
}

take

You get the first n elements, like in the example below, you pass a 3, so you get 1,2,3.

object TakeRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("TakeRDD").setMaster("local"))
    val data = Array(1, 2, 3, 4, 5)
    val rdd1: RDD[Int] = sc.parallelize(data)
    val result: Array[Int] = rdd1.take(3)
    println(result.mkString(","))
  }
}

takeSample

I’m going to pick n elements at random.

object TakeSampleRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("TakeSampleRDD").setMaster("local"))
    val data = 1 to 100
    val rdd1: RDD[Int] = sc.parallelize(data)
    val result: Array[Int] = rdd1.takeSample(true, 5, 1)
    println(result.mkString(","))
  }
}

takeOrdered

We sort it and we take N elements.

object TakeOrderedRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("TakeOrderedRDD").setMaster("local"))
    val data = 1 to 100
    val rdd1: RDD[Int] = sc.parallelize(data)
    val result: Array[Int] = rdd1.takeOrdered(5)
    println(result.mkString(","))
  }
}

countByKey

Count the number of occurrences of each key

object CountByKeyRDD {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("CountByKeyRDD").setMaster("local"))
    val data = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
    val rdd1: RDD[(String, Int)] = sc.parallelize(data)
    val result: collection.Map[String, Long] = rdd1.countByKey()
    result.foreach(item => println(item._1 + ":" + item._2))
  }
}

foreach

Traversal, such as the printing of many of the examples above.