Transform type operator

1.1 the Value type

1.1.1 map operator

Introduction:

Returns a new RDD that consists of each input element transformed by the func function, meaning that each element in the RDD executes this method once

Code:

		// Create SparkConf to set the local running mode
		val conf = new SparkConf()
    		.setMaster("local[1]")
    		.setAppName("MapOperator")
		/ / create SparkContext
		val sc = new SparkContext(conf)
		sc.setLogLevel("ERROR")

		// Create data
		val rdd = sc.parallelize(List("Li bai"."Han xin"."Zhang fei")).cache()
		// Use the Map operator
		val result = rdd.map((x) => (x,1))
		// Print the result
		result.foreach((x) => println(x.toString()))

		/ / close SparkContext
		sc.stop()
Copy the code

1.1.2 mapParatition operator

Introduction:

The func function type must be Iterator[T] => Iterator[U] when running on an RDD of type T. Assuming that there are N elements and M partitions, the map function will be called N times and mapPartitions will be called M times, one function processing all partitions at once.

Code:

		// Create SparkConf to set the local running mode
		val conf = new SparkConf()
			.setMaster("local[1]")
			.setAppName("MapPartitionsOperator")
		/ / create SparkContext
		val sc = new SparkContext(conf)
		sc.setLogLevel("ERROR")

		/** * is similar to map, but runs independently on each shard of the RDD, whereas map runs once on each element */

		// Create data
		val rdd = sc.parallelize(List("Li bai"."Han xin"."Zhang fei")).cache()
		// Use MapParatitions
		// add: {usually used when writing code block (is a single line of code can be used directly

		def fun(x : Iterator[String]): Iterator[Tuple2[String, Int]] = {// Create a collection of type Tuple to store data
			// List needs to be created as var because concatenation needs to point to the new List object
			var list = List[Tuple2[String, Int]]()

			while (x.hasNext) {
				// x
				var elem = x.next()
				// Store data to List and add data to List using concatenated collection
				list = list.:::(List(new Tuple2[String, Int](elem, 1)))
				// Note: The difference between :: and: is that :: takes a List and :: takes an element
			}
			list.iterator
		}

		// You can use the anonymous function directly or you can define the function directly
		// rdd.mapPartitions(fun)

		val result = rdd.mapPartitions { x =>

			// Create a collection of type Tuple to store data
			// List needs to be created as var because concatenation needs to point to the new List object
			var list = List[Tuple2[String, Int]]()

			while (x.hasNext) {
				// x
				var elem = x.next()
				// Store data to List and add data to List using concatenated collection
				list = list.:::(List(new Tuple2[String, Int](elem, 1)))
				// Note: The difference between :: and: is that :: takes a List and :: takes an element
			}
			list.iterator
		}

		// Iterate over the result and print
		result.foreach(println(_))

		/ / close SparkContext
		sc.stop()
Copy the code

1.1.3 mapPartitionsWithIndex operator

  1. MapPartitionsWithIndex (func) is similar to mapPartitions, but func takes an integer parameter to indicate the index value of the shard, which is equivalent to indexed partitions
  2. So when running on RDD of type T, the func function type must be (Int, Interator[T]) => Iterator[U];
  3. Requirement: Create an RDD so that each element forms a tuple with its partition to form a new RDD

Code:

		// Indexed MapPartitions
		// Create SparkConf to set the local running mode
		val conf = new SparkConf()
			.setMaster("local[1]")
			.setAppName("MapPartitionsWithIndexOperator")
		/ / create SparkContext
		val sc = new SparkContext(conf)
		sc.setLogLevel("ERROR")

		// Create data
		val rdd = sc.parallelize(List("Li bai"."Han xin"."Zhang fei")).cache()

		def fun(index: Int, x: Iterator[String]): Iterator[Tuple2[Int, String]] = {

			// List needs to be created as var because concatenation needs to point to the new List object
			var list = List[Tuple2[Int, String]]()

			while(x.hasNext){
				// Get the elements in the iterator
				var elem = x.next()
				// Note that the arguments to.: () are elements, while the arguments to.::() are lists
				list = list.::(new Tuple2[Int, String](index, elem))
			}

			list.iterator
		}

		val result = rdd.mapPartitionsWithIndex(fun)

		// Iterate over the result and print
		result.foreach(println(_))
		/ / close SparkContext
		sc.stop()
Copy the code

1.1.4 flatMap operator

Similar to a map, but each input element can be mapped to zero or multiple output elements (so func should return a sequence, not a single element)

For example, if the normal return is:

List (1, 2, 3)

List (1, 2, 3, 4)

List (1, 2, 3, 4, 5)

FlatMap will flatten the data into a List

123 1234 12345

Code:

/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("FlatMapOperator")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		Paralize: seq => local collection of RDD: List... , numSlices: number of partitions * 2. MakeRDD */ applies

		val rdd = sc.makeRDD(1 to 5)
		val flatMapResult = rdd.flatMap(1 to _)
		val mapResult = rdd.map(1 to _)

		// Map and Flat Map
		mapResult.foreach(println(_))
		flatMapResult.foreach(println(_))

		/ / close SparkContext
		sc.stop()
Copy the code

1.1.5 Difference between MAP and mapParatition

  1. Map () : Processes data one at a time.
  2. MapPartition () : processes data in one partition at a time. Data in the original PARTITION in the RDD can be released only after data in this partition is processed, which may result in OOM.
  3. Development guidance: When the memory space is large, mapPartition() is recommended to improve processing efficiency.

1.1.6 glom operator

  1. RDD[Array[T]] RDD[Array[T]]
  2. Requirement: Create a 4-partition RDD and put the data for each partition into an array

Code:

/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		RDD[Array[T]] [Array[T]]
		//2. Requirements: Create a 4-partition RDD and place the data from each partition into an array

		// Generate RDD data and set partition to 4
		val rdd = sc.parallelize(1 to 10.4)
		// Note the return value
		val result : RDD[Array[Int]] = rdd.glom()

		result.foreach(arr => {
			// ARR is Array type
			for (i <- 0 until arr.length) {
				// Print the result
				println(arr(i))
			}
			println("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =")})/ / close SparkContext
		sc.stop()
Copy the code

1.1.7 groupBy operator

  1. Function: Group by the return value of the passed function. Put the value of the same key into an iterator.
  2. Requirement: Create an RDD, grouped by element modules with a value of 2
  3. Note: this operator is not efficient and is not recommended. For details, please refer to the Note section of the corresponding method in the source code

Code:

/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(1 to 10)

		val result : RDD[(String, Iterable[Int])]= rdd.groupBy(x => {

			var key = ""

			x match {
				case _ if (x < 3) => {
					key = "small"
				}
				case _ if (x > 3 && x < 5) => {
					key = "big"
				}
				case _ if (x > 5) => {
					key = "very big"
				}
				case _ => {
					key = "void"
				}
			}
			key
		})

		// Iterate over the result
		result.foreach(x => {
			println("key : " + x._1 + " \t" + x._2)
		})

		sc.stop()
Copy the code

1.1.8 filter operator

  1. Function: filter. Returns a new RDD consisting of input elements evaluated by the func function that return true.
  2. Requirement: Create an RDD (consisting of strings) and filter out a new RDD (including the “xiao” substring)

Code:

/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		val rdd = sc.parallelize(1 to 10)

		/ / filter. Returns a new RDD consisting of input elements evaluated by the func function that return true.
		val result : RDD[Int] = rdd.filter(x => {
			if (x % 2= =0) {true
			}
			false
		})

		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.1.9 sample operator

  1. Function: Randomly sampling the number of fraction data with the specified random seed,

A fraction with a value of [0,1] represents the percentage of data to be extracted. For example, fraction = 0.3 represents 30% of data to be extracted

WithReplacement indicates whether the extracted data is put back, true is a sample with put back, false is a sample without put back, and seed is used to specify the random number generator seed.

  1. Requirements: Create an RDD (1-10) from which to select put back and do not put back samples

Code:

	/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(1 to 200)
		/ / sampling
		/** * function: Fraction * the number of total data randomly sampled from the specified random seed, * fraction with the size of [0,1] represents the percentage of data to be extracted. For example, fraction = 0.3 represents 30% of data to be extracted. * withReplacement represents whether the extracted data is replaced, and true represents a sample with the retrieved data. False is sampling without putting back, and seed is used to specify the random number generator seed. * /
		val result = rdd.sample(true.0.2.1234L)
		/ / traverse
		result.foreach(println(_))
		/ / close SparkContext
		sc.stop()
Copy the code

1.1.10 distinct operator

  1. Effect: Returns a new RDD after deduplication of the source RDD. By default, there are only eight parallel tasks to operate on, but this can be changed by passing an optional numTasks parameter.
  2. Requirement: Create an RDD and use distinct() to de-duplicate it

Code:

/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(List("Zhang"."Korea"."Li"."Korea"."The king"."The king"))
		// Use de-weighting
		val result = rdd.distinct()
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.1.11 coalesce operator

  1. Function: Reduces the number of partitions and improves the execution efficiency of small data sets after filtering large data sets.

Having too many partitions is actually not a good idea if you have a small amount of data,

  1. Requirement: Create a 4-partition RDD and scale it down

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Generate data
		val rdd = sc.parallelize(1 to 10.4)

		// Shrink the partition
		NumPartitions are repartitioned data. Shuffle true indicates that data is shuffled again
		// Default is false
		val result = rdd.coalesce(2.true)
		// The number of partitions after repartitioning
		val numPartitions = result.partitions.size
		println(numPartitions)

		sc.stop(a)
Copy the code

1.1.12 repartition operator

  1. Function: reshuffle all data randomly through the network according to the number of partitions. You can use the coalesce operator to achieve the same effect

After shuffling, data is evenly distributed

  1. Requirement: Create a 4-partition RDD and repartition it

Code:

def printPartition(rdd : RDD[Int]): Unit ={

		rdd.foreachPartition(f => {
			// A method to iterate over Iterator data
			while(f.hasNext){
				var element = f.next()
				print(element)
			}
			println()
		})
	}

	def main(args: Array[String]): Unit = {

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(1 to 15.4)
		// Prints data for each partition
		printPartition(rdd)

		// Repartition and shuffle data
		// Shuffle can distribute data evenly
		val result = rdd.repartition(3)
		println("Repartition and shuffle data.....")
		printPartition(result)

		sc.stop()
	}
Copy the code

1.1.13 Difference between coalesce and Repartition

  1. Coalesce Repartitions. You can select whether to perform shuffle. Shuffle: Boolean = false/true.
  2. Repartition is actually the coalesce of the call, which is shuffled by default. The source code is as follows:

Code:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)}Copy the code

1.1.14 sortBy operator

  1. Role; SortBy (func,[Ascending], [numTasks]) Uses func to process the data first and sorts the data according to the comparison result after processing. The default order is positive. Note: The final result is the original sorted data!
  2. Requirement: Create an RDD and sort by different rules

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		val rdd = sc.parallelize(List(1.4.2.5.8.3))
		// Use the function to process the data first, and then sort the processed data
		// Note that the original data is still returned
		// If f % 2, 4%2 => 0, 1%2 => 1
		// Rank 1 before 4 => 1,4
		val result = rdd.sortBy(f => {
			f % 2
		})
		// Iterate over the structure
		result.foreach(println(_))
		sc.stop()
Copy the code

1.1.15 pipe operator

  1. Effect: pipe, executes a shell script for each partition and returns the RDD output. Note: The script needs to be placed in a location accessible to the Worker node
  2. Requirement: Write a script and use pipes to apply the script to the RDD.
  3. Note: This won’t run on Windows for the time being because shell scripts need to be executed

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		val rdd = sc.parallelize(List("Han xin"."White dragon"."Qing emperor"), 1)
		val result = rdd.pipe("pipe.sh")
		// Print the result
		result.foreach(print(_))
		sc.stop()
Copy the code

1.2 Interaction between Two Values

1.2.1 the union operator

  1. Effect: Returns a new RDD after the union of the source RDD and parameter RDD
  2. Requirement: Create two RDDS and find the union

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")
		// Create data
		val rdd = sc.parallelize(1 to 5)
		val otherRdd = sc.parallelize(3 to 6)
		// Find the union of two RDD's
		val result = rdd.union(otherRdd)
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.2.2 the subtract operator

  1. What it does: A function that calculates the difference. If you remove the same elements from two RDD’s, different data is retained

Example: RDD: 1,2,3,4,5,6 otherRdd: 4,5,6,7,8

It will clear the data that is in both the RDD and otherRdd from the RDD and return the rest of the RDD data back

Result: result: 1,2,3

  1. Requirement: Create two RDD and find the difference between the first AND the second RDD

Code:


		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(1 to 6)
		val otherRdd = sc.parallelize(4 to 8)

		// Calculate the difference set of two RDD, remove the same data of two RDD, keep different data
		// Be careful!!
		RDD: 1,2,3,4,5,6 otherRdd: 4,5,6,7,8
		// It clears data from the RDD and otherRdd and returns the rest of the RDD
		val result = rdd.subtract(otherRdd)
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.2.3 intersection computes operator

  1. Effect: Returns a new RDD after the intersection of the source RDD and the parameter RDD
  2. Requirement: Create two RDDS and find the intersection of the two RDDS

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(1 to 6)
		val otherRdd = sc.parallelize(4 to 8)
		
		// A new RDD is returned after the intersection of the source AND parameter RDD
		val result = rdd.intersection(otherRdd)
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1. The cartesian operator

  1. Function: Cartesian product (try to avoid using)
  2. Requirement: Create two RDDS and compute the Cartesian product of the two RDDS

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(1 to 6)
		val otherRdd = sc.parallelize(4 to 8)

		// Compute the Cartesian product
		// Use with caution, as the results can be of a large order of magnitude
		// For example, the RDD calculation of two 100k data cartesian results in 100K * 100K => 10 billion
		val result = rdd.cartesian(otherRdd)
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.2.5 zip operator

  1. Function: Combine two RDD’s into a Key/Value RDD. By default, the number of partitions and elements in the two RDD’s are the same. Otherwise, an exception will be thrown.
  2. Requirement: Create two RDDS and combine them to form a (K, V)RDD

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(List(1.2.3), 1)
		val otherRdd = sc.parallelize(List("A"."B"."C"), 1)

		// Combine the two RDDS into Key/Value RDDS. By default, the partition number of the two RDDS is
		// With the same number of elements, otherwise an exception will be thrown!
		val result = rdd.zip(otherRdd)
		result.foreach(println(_))
		sc.stop()
Copy the code

1.3 the Key – Value types

1.3.1 partitionBy operator

  1. The pairRDD is partitioned. If the original partionRDD is the same as the existing partionRDD, the pairRDD is not partitioned. Otherwise, a ShuffleRDD is generated, which is a shuffle process.
  2. Requirement: Create a 4-partition RDD and repartition it

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(Array("AA"."BB"."CC"), 4).map(x => (x,1))

		PairRDD is partitioned if the original partionRDD is the same as the existing partionRDD
		// If the partition is not performed, ShuffleRDD will be generated, that is, shuffle will be generated.
		// Check the number of partitions
		println("Partition number: ====>" + rdd.partitions.size)
		// Repartition RDD
		val result = rdd.partitionBy(new HashPartitioner(2))
		println("Number of repartitions: ====>" + result.partitions.size)
		sc.stop()
Copy the code

1.3.2 groupByKey operator

  1. GroupByKey also operates on each key, but generates only one sequence.
  2. Requirement: Create a pairRDD, aggregate the corresponding values of the same key into a sequence, and calculate the sum of the corresponding values of the same key.

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val pairRdd = sc.parallelize(List("hello"."word"."word"."hello"))
			.map(x => (x,1))
		// Aggregate data with the same key
		val result = pairRdd.groupByKey(2).map(x => (x._1, x._2.sum))
		// Prints data
		result.foreach(println(_))
		sc.stop()
Copy the code

1.3.3 reduceByKey operator

  1. Returns an RDD of (K,V) and aggregates the values of the same key using the specified Reduce function. The number of Reduce jobs can be set using the second optional parameter.
  2. Requirement: Create a pairRDD that computes the sum of the corresponding values of the same key

Code:

/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val pairRdd = sc.parallelize(List("hello"."word"."word"."hello"))
			.map(x => (x,1))
		// Note that both of these are values of value
		val result = pairRdd.reduceByKey((v1,v2) => {
			v1 + v2
		})
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.3.4 Differences between reduceByKey and groupByKey

  1. ReduceByKey: Aggregate according to key, combine (pre-aggregate) before shuffle, and return RDD[K, V].
  2. GroupByKey: Shuffle groups by key.
  3. Development guide: reduceByKey is better than groupByKey, which is recommended. However, you need to pay attention to whether the business logic is affected

1.3.5 aggregateByKey operator

Parameter: (zeroValue:U,[Partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

  1. Function: In THE RDD of KV pair, value is grouped and merged according to key. During the merger, each value and initial value are taken as parameters of SEQ function for calculation, and the returned result is taken as a new KV pair, and then the result is merged according to key. Finally, the value of each grouping is passed to the Combine function for calculation (the first two values are calculated first, the return result and the next value are passed to the Combine function, and so on), and the key and the calculated result are taken as a new KV pair output.

  2. Parameter description: (1) zeroValue: Gives an initial value to each key in each partition. (2) seqOp: function used for iterating value with initial value in each partition step by step; (3) combOp: the function is used to merge the results in each partition.

  3. Requirement: Create a pairRDD, take the maximum value of the same key for each partition, and add it up

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		val rdd = sc.parallelize(List("A"."B"."C"),2).map((x) => (x,1))
		// pairRdd can be converted directly
		val pairRdd = sc.parallelize(List(("a".3), ("a".2), ("c".4), ("b".3), ("c".6), ("c".8)),2)
		// Take the maximum value of the same key for each partition and add them
		// (1) zeroValue: an initial value for each key in each partition;
		// (2) seqOp: the function is used to iterate over values with initial values in each partition;
		// (3) combOp: the function is used to merge the results in each partition.
		// Note: the whole process! The pairRdd key does not participate in the computation
		val result = pairRdd.aggregateByKey(0)((k, v) => {
			// k is zeroValue, and v is RDD value
			math.max(k,v)
		}, (u1,u2) => {
			// Merge the values of value
			u1 + u2
		})
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.3.6 foldByKey operator

ZeroValue: V)(func: (V, V) => V): RDD[(K, V)]

  1. Purpose: Simplified operation of aggregateByKey, same as seqop and combop
  2. Requirement: Create a pairRDD that computes the sum of the corresponding values of the same key

Code:

/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// pairRdd can be converted directly
		val pairRdd = sc.parallelize(List(("a".3), ("a".2), ("c".4), ("b".3), ("c".6), ("c".8)),2)
		AggregateByKey. Seqop is the same as combop
		//2. Requirements: Create a pairRDD and calculate the sum of the values of the same key
		val result = pairRdd.foldByKey(0)((v1,v2) => {
			v1 + v2
		})
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.3.7 combineByKey operator

Parameters: (createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)

  1. Function: Combine V into a set for the same K.

  2. CreateCombiner: combineByKey() iterates over all elements in the partition, so the key of each element is either not encountered yet or is the same as the key of a previous element. If this is a new element,combineByKey() uses a function called createCombiner() to create the initial value of the accumulator corresponding to that key (2) mergeValue: If this is a key already encountered before processing the current partition, it merges the current value of the key’s accumulator with the new value using the mergeValue() method. (3) mergeCombiners: Since each partition is processed independently, there can be multiple accumulators for the same key. If two or more partitions have accumulators corresponding to the same key, you need to merge the results of the partitions using the user-provided mergeCombiners() method.

  3. Requirement: Create a pairRDD and calculate the mean of each key based on the key. (First calculate the number of occurrences of each key and the sum of corresponding values, and then divide to get the result

Code:

/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(
			Array(("a".88), ("b".95), ("a".91), ("b".93), ("a".95), ("b".98)),2)

		val result = rdd.combineByKey(
			// createCombiner
			// ("a",88) ("a", 91) only one (88,1) key of each type ("a") will be generated
			(x) => {(x,1)},
			// mergeValue the mergeValue phase merges the same data as the RDD key(i.e. "a"/"b"),
			// for example, the result of createCombiner is: (88,1) 91,95
			// start merge acc: (88,1) v: 91, => (179, 2)
			(acc:(Int,Int), v) => {(acc._1 + v, acc._2 + 1)},
			// This stage is the data of the same key in each partition is merged
			Partition 1 = (179,2); partition 2 = (95,1);
			/ / (274, 3), 274
			(acc1 : (Int,Int), acc2 : (Int, Int)) =>{(acc1._1 + acc2._1, acc1._2 + acc2._2)}
		)

		result.foreach(println(_))
		sc.stop()
Copy the code

1.3.8 sortByKey operator

  1. Function: When called on a (K,V) RDD, K must implement the Ordered interface to return a (K,V) RDD Ordered by key
  2. Requirement: Create a pairRDD that sorts the keys in forward and backward order

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(Array((3."aa"), (6."cc"), (2."bb"), (1."dd")))
		// Sort by key: true; false
		val result = rdd.sortByKey(true)
		result.foreach(println(_))
		sc.stop()
Copy the code

1.3.9 mapValues operator

  1. Types of the form (K,V) operate only on V
  2. Requirements: create a pairRDD, and add the value string “| | |”

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")
		// Create data
		val rdd = sc.parallelize(Array((1."a"), (1."d"), (2."b"), (3."c")))
		// Operate only on value
		val result = rdd.mapValues(v => {
			v + "| | |"
		})
		// Print the result
		result.foreach(println(_))
		sc.stop()
Copy the code

1.3.10 join operator

  1. Function: Call on RDD of type (K,V) and (K,W) to return the RDD of (K,(V,W) with all elements corresponding to the same key
  2. Requirement: Create two pairRDD’s and aggregate data with the same key into a tuple.

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(Array((1."a"), (2."b"), (3."c")))
		val otherRdd = sc.parallelize(Array((1."A"), (2."B"), (3."C")))

		// Called on RDD of type (K,V) and (K,W), returns all pairs of the same key in one
		// RDD of (K,(V,W))
		val result = rdd.join(otherRdd)
		result.foreach(println(_))
		sc.stop()
Copy the code

1.3.11 cogroup operator

  1. Function: Returns an RDD of type (K,(Iterable,Iterable))
  2. Requirement: Create two pairRDD’s and aggregate the data with the same key into an iterator.

Code:

		/ / create SparkContext
		val conf = new SparkConf()
			.setAppName("Spark APP")
			.setMaster("local[1]")
		val sc = new SparkContext(conf)
		// Set the level of SparkContext print logs
		sc.setLogLevel("WARN")

		// Create data
		val rdd = sc.parallelize(Array((1."a"), (2."b"), (3."c")))
		val otherRdd = sc.parallelize(Array((1."A"), (2."B"), (3."C")))

		// Called on RDD of type (K,V) and (K,W), returns a class (K,(Iterable
      
       ,Iterable
       
        ))
       
      
		/ / type of RDD
		val result = rdd.cogroup(otherRdd)
		result.foreach(value => {

			val key = value._1
			val v1 : Iterable[String] = value._2._1
			val v2 : Iterable[String] = value._2._2

			print(key + "")
			print(v1 + "")
			print(v2 + "")
			println(a)
		})
		sc.stop(a)
Copy the code