1. RDD creation

You can use the following methods to create an RDD on Spark:

  1. Create an RDD from a collection (memory)
  2. Create an RDD from external storage (file)
  3. Create from other RDD: Use the conversion operator in the RDD to generate a new RDD

1.1 Creating an RDD from memory

To create an RDD from a collection, Spark provides two main methods: Parallelize and makeRDD (the latter is recommended)

def main(args: Array[String) :Unit = {
    // Todo prepares the environment
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    // Todo creates the RDD
    val seq = Seq[Int] (1.2.3.4)

    // 1. Parallelize
    val rdd: RDD[Int] = sc.parallelize(seq)
    // 2. The underlying implementation of the makeRDD method actually calls the PARALLELize method of the RDD object.
    val rdd: RDD[Int] = sc.makeRDD(seq)

    rdd.collect().foreach(println)

    // Todo closes the environment
    sc.stop()
  }
Copy the code

1.2 Creating an RDD from a File

RDD created from data sets of external storage systems includes local file systems and all data sets supported by Hadoop, such as HDFS and HBase.

def main(args: Array[String) :Unit = {
    // Todo prepares the environment
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
        
    // Todo creates AN RDD: Creates an RDD from a file, using the data in the file as a data source for processing
    // Todo 1
    val rdd: RDD[String] = sc.textFile("D:\\spark-learn\\datas\\1.txt")

    // Todo 2. Relative path: base on the root path of the current environment
    val rdd: RDD[String] = sc.textFile("datas/1.txt")

    // Todo 3
    val rdd: RDD[String] = sc.textFile("datas")

    // Todo 4. path The path can also use the wildcard *
    val rdd: RDD[String] = sc.textFile("datas/1*.txt")

    // Todo 5. Path can also be the path of a distributed storage system: HDFS
    val rdd: RDD[String] = sc.textFile("hdfs://hadoop131:9000/b.txt")

    // Print the contents of the file line by line
    rdd.collect().foreach(println)
    
    // Todo closes the environment
    sc.stop()
  }
Copy the code
  • TextFile: Reads data in units of behavior. The data read is a string

  • WholeTextFiles: reads data in the unit of file, and the read result is expressed as a tuple, namely (path, content)

val rdd: RDD[(String.String)] = sc.wholeTextFiles("datas")
rdd.collect().foreach(println)
    
/* (file:/D:/spark-learn/datas/1.txt,hello world hello spark) (file:/D:/spark-learn/datas/2.txt,hello world hello spark) * /
Copy the code

2. RDD parallelism and partitioning

The number of tasks executed in parallel across the cluster is called parallelism. Remember, this refers to the number of parallel tasks, not the number of shard tasks, so don’t get confused.

For example, if a Job is divided into 10 sub-tasks and the CPU has eight cores, the maximum parallelism is 8, but not 10

2.1 Parallelism of reading memory data

MakeRDD () the source code:

def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = {
    parallelize(seq, numSlices)
  }
Copy the code

Parameter 1: Seq[T] set

Parameter 2: number of partitions, including default values

  1. Specify the number of partitions
// Divide the collection into two partitions
val rdd: RDD[Int] = sc.makeRDD(List(1.2.3.4), 2)
Copy the code
  1. Default number of partitions
def main(args: Array[String) :Unit = {
    // Todo prepares the environment
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    / / sparkConf. Set (" spark. Default. Parallelism ", "5") / / set the parallelism
    val sc = new SparkContext(sparkConf)

    /** The second argument is not passed, so the makeRDD method uses the default value: DefaultParallelism (default parallelism) defaultParallelism = scheduler. Conf. Get int (" spark. Default. Parallelism ", TotalCores) spark first obtained from the configuration object sparkConf parameters: spark. Default. Parallelism if you do not get, so use totalCores attributes the maximum number of available CPU cores = current running environment (this machine is 8) * /
    val rdd1: RDD[Int] = sc.makeRDD(List(1.2.3.4.5.6))

    // Save the processed data to the component file, you can check the number of partitions
    rdd1.saveAsTextFile("output")

    // Todo closes the environment
    sc.stop()
  }
Copy the code

How do I view RDD partitions?

val rdd: RDD[Int] = sc.makeRDD(List(1.2.3.4.3), 2)
println(rdd.getNumPartitions)	//def getNumPartitions = partitions.length
println(rdd.partitions.size) 	//def partitions: Array[Partition] Table of partitions
Copy the code

2.2 Partition rules for reading memory data *

Read memory data, data can be set in accordance with the parallelism data partition operation, data partition rules source:

def positions(length: Long, numSlices: Int) :Iterator[(Int.Int)] = {(0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
	}
}
Copy the code

Source code analysis:

/** * Two parameters in positions(length, numSlices) code separately: * length = set length * numSlices = number of partitions * * take List(1,2,3,4,5), number of partitions =3, for example, Length =5, numSlices=3 * * (0 until numSlices) => Range(0, 1, 2) * I => [start, end) 1) = 0 * 1 = > [1, 3) = 1, 2 * 2 = > [3, 5) = 3, 4 * * so, List (1, 2, 3, 4, 5) = > List (1), the List (2, 3), the List (4, 5) * /
Copy the code

Example:

val rdd: RDD[Int] = sc.makeRDD(List(1.2.3.4), 2)
// divide evenly: [1,2], [3,4]

val rdd: RDD[Int] = sc.makeRDD(List(1.2.3.4), 3)
// [1], [2], [3,4]

val rdd: RDD[Int] = sc.makeRDD(List(1.2.3.4.5), 3)
// [1], [2,3], [4,5]
Copy the code

2.3 Parallelism of reading file data

Read files:

/** * path: path * minPartitions: The minimum number of partitions defaultMinPartitions = math.min(defaultParallelism, 2) * where defaultParallelism is the defaultParallelism, as detailed in section 2.2. = local[n] The default value is n, where local[*] is the number of cores */

def textFile(
      path: String,								 
      minPartitions: Int = defaultMinPartitions) 
Copy the code

Spark reads files and partitions them. At the bottom, the slicing method of Hadoop is used. It can be divided into three steps:

//1. Count the total number of bytes of the file
long totalSize = compute(...)

//2. Calculate the target size for each partition
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

//3. Calculate the number of partitions
totalNum = totalSize / goalSize; // if the remainder is large, totalNum++
Copy the code

Example 1: The number of partitions is not passed

val rdd: RDD[String] = sc.textFile("datas/1.txt")
MinPartitions = math.min(defaultParallelism, 2) = 2
TotalSize = 24 (byte)
GoalSize = totalSize / 2 = 12 (byte)
// 3. Final partition number 24/12 = 2 (divisible)
Copy the code

Example 2: Number of incoming partitions

val rdd1 = sc.textFile("datas/3.txt".2)
MinPartitions = num = 2
// 1. TotalSize = 7 (byte)
// 2. GoalSize = totalSize/num = 7/2 = 3
// 3. Final partition number: 7/3 = 2... 1 (not less than 1.1 times) + 1 = 3
If the remainder is small enough, to prevent the creation of small files, only one partition; If the remainder is large, it becomes a separate partition
Copy the code

Conclusion:

Either default or passed in, minPartitions do not represent the final number of partitions, but the minimum number of partitions and provide a basis for calculating slice size and the final number of partitions.

2.4 Partition rules for reading file data *

TXT file, in which @@ is a newline character and occupies two bytes. Therefore, the file contains seven bytes.

a@@

b@@

c

If the minimum number of partitions is set to 2, 3 partitions will be generated:

val rdd = sc.textFile("datas/3.txt".2)
// Create a partition with the contents of [a b], [c], []
Copy the code

Why not divide the three numbers evenly among the three partitions? The following describes the partition rules for Spark to read file data:

1. Data is read in behavior units

Spark reads files in Hadoop mode. Therefore, spark reads files line by line, regardless of the number of bytes. There is no read-only line, which is either unread or the entire line is read.

2. When data is read, the unit is offset. The offset will not be read repeatedly

Offset: Numbered in bytes A @@ => 012 B @@ => 345 C => 6Copy the code

3. Calculate the offset range of data partitions

totalSize = 7(byte) goalSize = totalSize/num =7 / 2 = 3(byte) Number of final partitions:7 / 3 = 2(..1.) + 1 = 3So, theRDDA total of produce3, respectively0.1.2Partition I => logical offset range [left, right], left = (I- 1)*goalSize, right = left + curSize0,1CurSize =3The partition2CurSize =1Then, the involved rows are read according to the logical offset range [left, right]. There is no row that only reads part of the offset. In addition, the offset is not read repeatedly, jumping to an unread offset and the final result is the contents of the logical offset range of the partition0= > [0.3]    0- 3If the first two lines are involved => A b1= > [3.6]    3- 5Read, read only6   =>    c
2= > [6.7]    6Have read = >Copy the code

Another example:

Content offset abcdefg@@ =>012345678
hi@@       => 9101112
j          => 13

totalSize = 14(byte) goalSize =14 / 2 = 7(byte) Number of final partitions:14 / 7 = 2The contents of the logical offset range of the partition0      [0.7] => abcdefg will8Number byte read (in behavior units)1      [7.14] => hij bytes7- 8 -Read, read9- 14, can read two linesCopy the code

2.5 summary

Memory RDD RDD file
How to create makeRDD textFile
parallelism makeRDD(seq, numSlices default)

The number of partitions can be specified. Default = defaultParallelism
textFile( path, minPartitions default)

The minimum number of partitions can be specified, but is not the same as the final number of partitions

Default = math.min(defaultParallelism, 2)
The partition rules (0 until numSlices) => [start, end) Calculates the logical offset range for each partition [left, right] and reads the rows involved

Basic rule: read by line, line is the basic unit of read, offset is not read repeatedly