The partition determines which partition the data in the RDD will end up in during shuffle

Note:

  • Only the KEY-value RDD has a partitioner. The Value of a non-key-value RDD partitioner is None
  • The partition ID of each RDD ranges from 0 to numPartitions-1, partition0 to partition(numPartitions-1).

Get the RDD divider

You can get the partitioning method of the RDD by using the PARTItioner property of the RDD. It returns a Scala.option object

    val pairs = sc.parallelize(List((1.1), (2.2), (3.3)))
    pairs.partitioner
    //Option[org.apache.spark.Partitioner] = None
    
    val partitioned = pairs.partitionBy(new HashPartitioner(2))
    partitioned.partitioner
    //Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
Copy the code

Hash partitioning

If the remainder is less than 0, the remainder + the number of partitions (plus 0 otherwise). The value returned is the ID of the partition to which the key belongs.

val nopar = sc.parallelize(List((1.3), (1.2), (2.4), (2.3), (3.6), (3.8)),8)
nopar.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+":"+iter.mkString("|")) }).collect    
Array[String] = Array("0.".1 : (1.3), 2 : (1.2), 3 : (2.4), "4.".5 : (2.3), 6 : (3.6), 7 : (3.8)) 

val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))
hashpar.mapPartitions(iter => Iterator(iter.length)).collect()
//Array[Int] = Array(0, 3, 1, 2, 0, 0, 0)

hashpar.partitioner
//Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)

Copy the code

Range partitioning

Disadvantages of HashPartitioner partitioning: It can result in an uneven amount of data in each partition, or in extreme cases, in some partitions having all the data in the RDD.

RangePartitioner: Maps numbers within a certain range to a certain partition to ensure that the amount of data in each partition is uniform and there is order between partitions. Elements in one partition must be smaller or larger than those in the other partition, but the order of elements in the partition cannot be guaranteed. In simple terms, the number in a certain range is mapped to a partition. The realization process is as follows:

Step 1: Extract sample data from the whole RDD, sort the sample data, calculate the maximum key value of each partition, and form an Array variable rangeBounds of Array[key] type.

Step 2: Determine the range of the key in rangeBounds and give the partition ID subscript of the key in the next RDD. The partition requires that the KEY type in the RDD be sortable

Custom partitions

To implement custom partition, you need to inherit org. Apache. Spark. The Partitioner class and implement the following three methods. NumPartitions: Int: returns the number of partitions created.

GetPartition (key: Any): Int: returns the partition number of the given key (0 to numPartitions-1).

(3) equals():Java’s standard method for determining equality. The implementation of this method is very important. Spark needs to use this method to check whether your partition object is the same as other partition instances so that Spark can determine whether two RDD partitions are the same.

Requirement: Write data with the same suffix to the same file by partitioning the data with the same suffix into the same partition and saving the output.

Create a pairRDD

val data = sc.parallelize(Array((1.1), (2.2), (3.3), (4.4), (5.5), (6.6)))
Copy the code

(2) Define a custom partition class

class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{

  // Number of overwrites
  override def numPartitions: Int = numParts

  // override partition number getfunction
  override def getPartition(key: Any) :Int = {
    val ckey: String = key.toString
    ckey.substring(ckey.length- 1).toInt%numParts
  }
}

Copy the code

(3) Repartition the RDD using a customized partition class

val par = data.partitionBy(new CustomerPartitioner(2))
Copy the code

(4) View the data distribution after repartitioning

par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect
/ / Array [(Int, Int, Int))] = Array ((0, (2, 2)), (0, (4, 4)), (0, 6, 6)), (1, (1, 1)), (1) (3, 3), (1, (5, 5)))
Copy the code