In the first article | Spark planner “, on the overall condition of the Spark are expounded. This article delves into Spark Core, the core component of The Spark platform. Spark Core is the basic universal execution engine on which all other functions are built. Not only does it provide in-memory computing capabilities for speed, but it also provides a common execution model to support a variety of applications, and users can develop applications using Java, Scala, and Python apis. Spark Core is based on a unified abstract RDD, which enables the integration of Spark components and the use of different components in the same application to complete complex big data processing tasks. The main contents of this paper are as follows:

  • What is the RDD
    • RDD is designed for
    • Basic concepts and main features of RDD
    • Wide and narrow dependencies
    • Stage division and job scheduling
  • RDD operator
    • Transformations
    • Actions
  • Shared variables
    • Radio variable
    • accumulator
  • persistence
  • Integrated case

What is the RDD

designed

At the beginning of the design of Resilient Distributed Datasets (RDD) is to solve the problem that some existing computing frameworks are not efficient in processing two types of application scenarios, which are iterative algorithm and interactive data mining. In both scenarios, performance can be improved by several orders of magnitude by keeping the data in memory. For iterative algorithms, such as PageRank, K-means clustering, logistic regression, etc., intermediate results often need to be reused. Another application scenario is interactive data mining, such as running multiple AD hoc queries on the same data set. Most computing frameworks (such as Hadoop) use intermediate computing results by writing them to an external storage device (such as HDFS), which adds additional load (data replication, disk IO, and serialization) and thus increases application execution time.

RDD most application can effectively support the reuse of data, it is a kind of fault tolerance and parallel data structure, can let the user explicitly to the intermediate results persisted in memory, and can be through the partition to optimize the storage of data, in addition, RDD support rich operator operation, users can easily use these operators to operating RDD.

The basic concept

An RDD is a distributed collection of objects, essentially a read-only, partitioned collection of records. Each RDD can be divided into multiple partitions, which are stored on different cluster nodes (as shown in the following figure). RDD is a highly constrained shared memory model, meaning that the RDD is a read-only collection of partitioned records and therefore cannot be modified. You can create an RDD in only two ways. One is to create an RDD based on the data stored in the physical storage system, and the other is to obtain a new RDD by performing transformation operations (such as map, filter, and join) on another RDD.


RDD did not need to be materialized. Lineage was used to determine that it was derived from RDD. In addition, the user can control the persistence and partitioning of the RDD, and the user can persist the RDD that needs to be reused (such as memory, or disk) to improve computing efficiency. You can also distribute the elements of the RDD on different machines based on the key of the record, ensuring that the hash partition is the same when joining two data sets, for example.

The main features

  • Based on the memory

    An RDD is a collection of objects in memory. RDD can be stored in memory, disk, or memory plus disk, but Spark’s speed is based on the fact that the data is stored in memory and each operator does not extract data from disk.

  • partition

    Partitioning divides logical data sets into independent parts. Partitioning is a technical method to optimize the performance of a distributed system, reducing network traffic transmission. Distributing elements with the same key in the same partition reduces the impact of shuffle. The RDD is divided into partitions, which are distributed on different nodes in the cluster.

  • Strongly typed

    Data in an RDD is strongly typed, and when an RDD is created, all elements are of the same type, depending on the data type of the dataset.

  • Lazy loading

    Spark’s conversion operation is lazy loading mode, which means that some column operator operation is performed only after the action(such as count, collect, etc.) operation is performed.

  • Do not modify the

    Once an RDD is created, it cannot be modified. You can only convert from one RDD to another.

  • parallelization

    RDD can be run in parallel. Since RDD is partitioned, each partition is distributed on a different machine, so each partition can be run in parallel.

  • persistence

    Since the RDD is lazily loaded, only the Action action will cause the RDD transformation operation to be executed to create the corresponding RDD. Some RDD that is reused can be persisted (for example, stored in memory or disk) to improve computing efficiency. Spark supports multiple persistence strategies.

Wide and narrow dependencies

Different operations in RDD can cause partitions in different RDD to produce different dependencies. There are two main types of dependencies:Wide rely onandNarrow rely on. Wide dependence means that one partition of a parent RDD corresponds to multiple partitions of a child RDD, and narrow dependence means that one partition of a parent RDD corresponds to one partition of a child RDD, or the partitions of multiple parent RDD correspond to one partition of a child RDD. As for wide dependence and narrow dependence, the following figure shows:

Stage division

Narrow dependencies are divided into the same stage so that they can be executed iteratively as pipes. Wide dependencies typically depend on multiple partitions, so data needs to be transferred across nodes. In terms of easy disaster, the two dependency recovery methods are different. For narrow dependency, only the partition whose parent RDD is lost needs to be recovered, while for wide dependency, all partitions whose parent RDD is lost need to be recovered.

DAGScheduler divides the RDD of a Job into different stages and builds a dependency of stages, namely DAG. The purpose of this division is to ensure parallel execution of stages without dependencies and sequential execution of stages with dependencies. Stages are mainly divided into two types, ShuffleMapStage and ResultStage. ShuffleMapStage belongs to the upstream stage, whereas the ResultStage belongs to the most downstream stage, which means that the upstream stage executes first and the ResultStage executes last.

  • ShuffleMapStage

ShuffleMapStage is an intermediate stage in the DAG scheduling process, which can contain one or more ShuffleMapTasks to generate Shuffle data. ShuffleMapStage can be a pre-stage of ShuffleMapStage. But it must be a pre-stage of a ResultStage. Part of the source code is as follows:

private[spark] class ShuffleMapStage(

    id: Int.

    rdd: RDD[_].

    numTasks: Int.

    parents: List[Stage].

    firstJobId: Int.

    callSite: CallSite.

    val shuffleDep: ShuffleDependency[_, _, _],

    mapOutputTrackerMaster: MapOutputTrackerMaster
)


  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {

      // omit the code

  }

}

Copy the code
  • ResultStage

The ResultStage is the last stage to execute, such as printing data to the console or writing data to an external storage device. Part of the source code is as follows:

private[spark] class ResultStage(

    id: Int.

    rdd: RDD[_].

    val func: (TaskContext.Iterator[_]
= > _.


    val partitions: Array[Int].

    parents: List[Stage].

    firstJobId: Int.

    callSite: CallSite)

  extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

// omit the code

}



Copy the code

As mentioned above, Spark generates DAGS by analyzing the dependencies of each RDD, and determines how to divide stages based on the dependencies between partitions in each RDD. The specific idea is as follows: reverse parsing is carried out in DAG, and the current RDD is added to the current stage when the wide dependency is encountered and the narrow dependency is encountered. Narrow dependencies are divided into the same stage to form a pipeline to improve computing efficiency. Therefore, a DAG graph can be divided into multiple stages. Each stage represents a set of tasks associated with no shuffle dependence on each other. Each task set will be submitted to the TaskScheduler for scheduling processing. Ultimately, the task is distributed to an Executor for execution.

Spark job scheduling process

Spark first performs a series of RDD conversion operations on jobs, and builds a DAG(Direct Acyclic Graph) based on the dependence relationship between RDD. Then the RDD is divided into different stages according to the RDD dependency, each stage creates multiple tasks according to the partition number, and finally submits these tasks to the work node of the cluster for execution. The specific process is shown in the figure below:


  • 1. Build DAG and submit the DAG to the scheduling system;

  • 2.DAGScheduler is responsible for receiving DAG and dividing the DAG into multiple stages. Finally, tasks in each stage are submitted to a TaskScheduler in the form of a TaskSet for next processing.

  • 3. The cluster manager is used to allocate resources and schedule tasks. A retry mechanism is provided for failed tasks. TaskScheduler receives tasksets from DAGScheduler, creates TaskSetManager to manage tasksets, and schedules tasks by SchedulerBackend.

  • 4. Perform specific tasks and store the intermediate and final results of the tasks in the storage system.

RDD operator

Spark provides a variety of RDD operators, including Transformation and Action. The following describes some common operators.

Transformation

The following are some common transformation operations. It is worth noting that for normal RDD, the Scala, Java, Python, and R apis are supported. For pairRDD, only the Scala and Java apis are supported. Some common operators are explained below:

  • map(func)
  / * *

* Pass each element to the func function and return a new RDD

* /


  def map[UClassTag](f: T= >U) :RDD[U] = withScope {

    val cleanF = sc.clean(f)

    new MapPartitionsRDD[U.T] (this, (context, pid, iter) => iter.map(cleanF))

  }

Copy the code
  • filter(func)
/ * *

* Filter out the elements that satisfy the func function and return a new RDD

* /


  def filter(f: T= >Boolean) :RDD[T] = withScope {

    val cleanF = sc.clean(f)

    new MapPartitionsRDD[T.T] (

      this.

      (context, pid, iter) => iter.filter(cleanF),

      preservesPartitioning = true)

  }

Copy the code
  • flatMap(func)
/ * *

* First apply func to all elements of the RDD, then square the result so that one element maps to zero or more elements, returning a new RDD

* /


  def flatMap[UClassTag](f: T= >TraversableOnce[U) :RDD[U] = withScope {

    val cleanF = sc.clean(f)

    new MapPartitionsRDD[U.T] (this, (context, pid, iter) => iter.flatMap(cleanF))

  }

Copy the code
  • mapPartitions(func)
/ * *

* Apply func to each partition of the RDD to return a new RDD

* /


  def mapPartitions[UClassTag] (

      f: Iterator[T] = >Iterator[U].

      preservesPartitioning: Boolean = false) :RDD[U] = withScope {

    val cleanedF = sc.clean(f)

    new MapPartitionsRDD(

      this.

      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),

      preservesPartitioning)

  }

Copy the code
  • union(otherDataset)
/ * *

* Return a new RDD with two RDD elements, similar to the SQL UNION ALL

* /


  def union(other: RDD[T) :RDD[T] = withScope {

    sc.union(this, other)

  }

Copy the code
  • intersection(otherDataset)
/ * *

* Returns a new RDD containing the intersection of two RDD's

* /


  def intersection(other: RDD[T) :RDD[T] = withScope {

    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))

        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }

        .keys

  }

Copy the code
  • distinct([numPartitions]))
 / * *

* Return a new RDD with the original RDD element deduplicated

* /


  def distinct() :RDD[T] = withScope {

    distinct(partitions.length)

  }

Copy the code
  • groupByKey([numPartitions])
/ * *

* will pairRDD according to key groups, the performance overhead is larger, the operator can use PairRDDFunctions. AggregateByKey

* or PairRDDFunctions reduceByKey instead

* /


  def groupByKey() :RDD[(K.可迭代[V])] = self.withScope {

    groupByKey(defaultPartitioner(self))

  }

Copy the code
  • reduceByKey(func[numPartitions])
/ * *

* The reduce function is used to aggregate the values corresponding to each key. This operator locally merges the results of each Mapper and then sends the results to reducer, similar to the Combiner function of MapReduce

* /


  def reduceByKey(func: (V.V) = >V) :RDD[(K.V)] = self.withScope {

    reduceByKey(defaultPartitioner(self), func)

  }

Copy the code
  • aggregateByKey(zeroValue) (seqOp.combOp[numPartitions])
/ * *

* Aggregate the value of each key using the given aggregate function and initial value

* /


  def aggregateByKey[UClassTag](zeroValue: U)(seqOp: (U.V) = >U.

      combOp: (U.U) = >U) :RDD[(K.U)] = self.withScope {

    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)

  }

Copy the code
  • sortByKey([ascending], [numPartitions])
/ * *

* The RDD is sorted by key, so the elements of each partition are sorted

* /




  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)

      : RDD[(K.V)] = self.withScope

  {

    val part = new RangePartitioner(numPartitions, self, ascending)

    new ShuffledRDD[K.V.V](self, part)

      .setKeyOrdering(if (ascending) ordering else ordering.reverse)

  }

Copy the code
  • join(otherDataset[numPartitions])
/ * *

* JOIN pairRDD with the same key together and return (k, (v1, v2))tuple

* /


  def join[W](other: RDD[(K.W)]) :RDD[(K, (V.W))] = self.withScope {

    join(other, defaultPartitioner(self, other))

  }

Copy the code
  • cogroup(otherDataset[numPartitions])
/ * *

Iterable (K, Iterable[V], Iterable[W1], Iterable[W2])

* The first Iterable contains the value of the current RDD key, the second Iterable contains the * value of the W1 RDD key, and the third Iterable contains the value of the W2 RDD key

* /


  def cogroup[W1.W2](other1: RDD[(K.W1)], other2: RDD[(K.W2)], numPartitions: Int)

      : RDD[(K, (可迭代[V].可迭代[W1].可迭代[W2]))] = self.withScope {

    cogroup(other1, other2, new HashPartitioner(numPartitions))

  }

Copy the code
  • coalesce(numPartitions)
/ * *

* This function is used to repartition the RDD, using the HashPartitioner. The first parameter is the number of repartitions. The second parameter is whether to shuffle. The default value is false.

* /


def coalesce(numPartitions: Int, shuffle: Boolean = false.

               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

              (implicit ord: Ordering[T] = null)

      : RDD[T] = withScope {

    require(numPartitions > 0.s"Number of partitions ($numPartitions) must be positive.")

    if (shuffle) {

      /** Distributes elements evenly across output partitions, starting from a random partition. */

      val distributePartition = (index: Int, items: Iterator[T]) = > {

        var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)

        items.map { t =>

          // Note that the hash code of the key will just be the key itself. The HashPartitioner

          // will mod it with the number of total partitions.

          position = position + 1

          (position, t)

        }

      } : Iterator[(Int.T)]



      // include a shuffle step so that our upstream tasks are still distributed

      new CoalescedRDD(

        new ShuffledRDD[Int.T.T](mapPartitionsWithIndex(distributePartition),

        new HashPartitioner(numPartitions)),

        numPartitions,

        partitionCoalescer).values

    } else {

      new CoalescedRDD(this, numPartitions, partitionCoalescer)

    }

  }

Copy the code
  • repartition(numPartitions)
/ * *

* Partitions can be added or subtracted, and the coalesce method is called underneath. You are advised to use coalesce to reduce the number of partitions because shuffle is avoided

* /


  def repartition(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T] = withScope {

    coalesce(numPartitions, shuffle = true)

  }

Copy the code

Action

Some common action operators are shown in the following table

operation meaning
count() Returns the number of elements in the dataset
collect() Returns all the elements in the dataset as an array
first() Returns the first element in the dataset
take(n) Returns the first n elements of a dataset as an array
reduce(func) The elements of the dataset are aggregated by the function func, which takes two arguments and returns a value
foreach(func) Each element in the dataset is passed to the function func to run

Shared variables

Spark provides two types of shared variables: broadcast variables and accumulators. Broadcast variables are read-only variables and hold a copy on each node without sending data across the cluster. Accumulators can add data from all tasks into a shared result.

Radio variable

Broadcast variables allow users to share an immutable value across the cluster, and this shared, immutable value is held onto each node in the cluster. Usually used when a small data set (such as dimension table) needs to be copied to each node in the cluster, such as log analysis applications. Web logs usually contain only pageId, and the title of each page is kept in a table. If you want to analyze logs (such as which pages are accessed the most), you need to join the two together. You can then use the broadcast variable to broadcast the table to each node in the cluster. The details are shown in the figure below:


As shown in the figure above, the Driver first splits the serialized objects into smaller databases and then stores these data blocks on the BlockManager of the Driver node. When a specific task is executed in Ececutor, each executor first attempts to extract data from its node’s BlockManager, and uses the broadcast variable if it has previously extracted the value. If not, the value of the broadcast variable is extracted from a remote Driver or other Executor, and once retrieved, stored in the BlockManager of its own node. This mechanism avoids performance bottlenecks caused by the Driver side sending data to multiple executors.

The basic usage mode is as follows:

// Simulate a data set

scala> val mockCollection = "Spark Flink Hadoop Hive".split("")

mockCollection: Array[String] = Array(Spark.Flink.Hadoop.Hive)

/ / structure RDD

scala> val words = sc.parallelize(mockCollection,2)

words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:29

// Simulates broadcast variable data

scala> val mapData = Map("Spark" -> 10."Flink" -> 20."Hadoop" -> 15."Hive" -> 9)

mapData: scala.collection.immutable.Map[String.Int] = Map(Spark -> 10.Flink -> 20.Hadoop -> 15.Hive -> 9)

// Create a broadcast variable

scala> val broadCast = sc.broadcast(mapData)

broadCast: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String.Int]] = Broadcast(4)

// Use a broadcast variable inside the operator, fetch values according to the key, in ascending order of value

scala> words.map(word => (word,broadCast.value.getOrElse(word,0))).sortBy(wordPair => wordPair._2).collect

res5: Array[(String.Int)] = Array((Hive.9), (Spark.10), (Hadoop.15), (Flink.20))

Copy the code

accumulator

An Accumulator is another shared variable provided by Spark. Unlike a broadcast variable, an Accumulator can be modified and is variable. Each transformation transfers the value of the changed accumulator to the Driver node, and the accumulator implements a cumulative function, similar to a counter. Spark supports digital accumulators, and users can customize the accumulator type.


The basic use

Can pass sparkContext. LongAccumulator () or sparkContext. DoubleAccumulator () to create Long and the accumulator type Double respectively. Tasks running in a cluster can call the add method to add the accumulator variable, but cannot read the value of the accumulator. Only the Driver program can read the value of the accumulator by calling the value method.

object SparkAccumulator {

  def main(args: Array[String) :Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName(SparkShareVariable.getClass.getSimpleName)

    val sc = new SparkContext(conf)

    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

    val list = List(1.2.3.4.5.6.7.8.9.10.11.12.13)

    val listRDD = sc.parallelize(list)

    var counter = 0 // External variables

    // An accumulator is initialized with an initial value of 0

    val countAcc = sc.longAccumulator("my accumulator")

    val mapRDD = listRDD.map(num => {

      counter += 1 // Use external variables inside the operator so that the operation does not change the value of the external variables

      if (num % 3= =0) {

        // If a multiple of 3 is encountered, the accumulator +1

        countAcc.add(1)

      }

      num * 2

    })

    mapRDD.foreach(println)

    println("counter = " + counter) // counter = 0

    println("countAcc = " + countAcc.value) // countAcc = 4

    sc.stop()

  }

}

Copy the code

Scream tips:

Some local or member variables declared in dirver can be directly used in Transformation, but the result is not reassigned to the corresponding variable in dirver after transformation. After the transformation is triggered by action, the DAGScheduler packages the code, serializes it, and sends it to the executors on each Worker node. The variables executed in Transformation are on their own nodes, not the original variables on the dirver. They are just copies of the corresponding variables on the driver.

Custom accumulator

Spark provides some default types of accumulators and also supports custom accumulators. AccumulatorV2 is an accumulator. The AccumulatorV2 accumulator is an accumulator.

class customAccumulator extends AccumulatorV2[BigInt.BigInt]{

  private var num:BigInt = 0

  / * *

* Returns whether this accumulator has a value of 0, such as a counter, 0 for zero, or if it is a list, Nil for zero

* /


  def isZeroBoolean = {

    this.num == 0

  }

  // Create an accumulator copy

  def copy() :AccumulatorV2[BigInt.BigInt] = {

    new customAccumulator

  }

  / * *

* Reset accumulator to 0. The call to 'isZero' must return true

* /


  def reset() :Unit = {

    this.num = 0

  }

  // Based on the input values, add up,

  // If the value is even, the accumulator adds the value

  def add(intVal: BigInt) :Unit = {

    if(intVal % 2= =0) {

      this.num += intVal

    }

  }

  / * *

* Merge other Accumulators of the same type and update this Accumulator value

* /


  def merge(other: AccumulatorV2[BigInt.BigInt) :Unit = {

    this.num += other.value

  }

  / * *

* Defines the current value of Accumulator

* /


  def valueBigInt = {

    this.num

  }

}



Copy the code

Use the custom accumulator

    val acc = new customAccumulator

    val newAcc = sc.register(acc, "evenAcc")

    println(acc.value)

    sc.parallelize(Array(1.2.3.4)).foreach(x => acc.add(x))

    println(acc.value)

Copy the code

persistence

Persistent method

In Spark, RDD uses the lazy evaluation mechanism. Every time an action is encountered, the calculation is performed from the beginning. Each invocation of the action action triggers a calculation from the beginning. Spark supports persistence for RDD that needs to be reused. You can call persist() or cache() to hold the RDD plan. The overhead of double-counting can be avoided through persistence mechanisms. It is worth noting that when a persistent method is called, only the RDD token is persisted until the first action is executed before the result is persisted. The persisted RDD is retained in the memory of the compute node for reuse by subsequent operations.

The main differences between the two persistence methods provided by Spark are as follows: The cache() method uses the memory level by default, and the underlying method calls the persist() method. The source code is as follows:

def persist(newLevel: StorageLevel) :this.type = {

    if (isLocallyCheckpointed) {

      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)

    } else {

      persist(newLevel, allowOverride = false)

    }

  }



  / * *

* Use the default storage level to persist RDD (' MEMORY_ONLY ').

* /


  def persist() :this.type = persist(StorageLevel.MEMORY_ONLY)



  / * *

* Use the default storage level to persist RDD (' MEMORY_ONLY ').

* /


  def cache() :this.type = persist()



/ * *

* Manually remove persistent RDD from cache

* /


  def unpersist(blocking: Boolean = true) :this.type = {

    logInfo("Removing RDD " + id + " from persistence list")

    sc.unpersistRDD(id, blocking)

    storageLevel = StorageLevel.NONE

    this

  }

Copy the code

Hold the planned storage level

Spark provides multiple persistence levels, such as memory, disk, and memory + Disk. Details are shown in the following table:

Storage Level Meaning
MEMORY_ONLY By default, this means that the RDD is stored in the JVM as a deserialized Java object. If memory runs out, some partitions are not persisted and recalculated when they are used.
MEMORY_AND_DISK The RDD is stored in the JVM as a deserialized Java object, and if memory runs out, the excess partition is stored on hard disk.
MEMORY_ONLY_SER (Java and Scala) Serialize the RDD to Java objects for persistence, with one byte array per partition. This method saves more space than deserialization, but consumes more CPU resources
MEMORY_AND_DISK_SER (Java and Scala) And MEMORY_ONLY_SER, overwrite to disk if there is not enough memory.
DISK_ONLY Stores RDD partitioned data to disk
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. This is similar to the above approach, but the partition data is replicated to both clusters
OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, to store data to the off-heap memory, the off-heap must be turned on

Selection of persistence level

Spark provides a persistent storage level that balances memory usage with CPU efficiency. The following options are generally recommended:

  • If memory can hold RDD, the default persistence level, MEMORY_ONLY, can be used. This is the most efficient choice for the CPU to make the operator on the RDD run as fast as possible.

  • If memory is low, try MEMORY_ONLY_SER, and use a fast serialization library such as Kryo to save a lot of space.

    Tips: In some shuffle operators, such as reduceByKey, Spark automatically persists the intermediate results even if the explicit persist method is not invoked. In this way, the purpose is to avoid recalculating the entire input due to a failure during shuffle. Even so, persistence is recommended for RDD that needs to be reused.

Integrated case

  • case 1
/ * *

* 1. Data set

  *          [orderId,userId,payment,productId]

2 * 1108280100

4 * 2202300200

* 3210588324 1

* 4198500, 0356

3 * 5200590297

* 6678800 0183 78

* 7243200281 9

* 8236789 0281 9

* 2. Requirement description

* Calculate Top3 order amount

  *           

* 3. Result output

  *       1 8000

* 2, 7890

* 3, 5000

* /


object TopOrder {

  def main(args: Array[String) :Unit = {

    val conf = new SparkConf().setAppName("TopN").setMaster("local")

    val sc = new SparkContext(conf)

    sc.setLogLevel("ERROR")

    val lines = sc.textFile("E://order.txt")

    var num = 0;

    val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))

      .map(_.split(",") (2))     // Withdraw the payment amount

      .map(x => (x.toInt,""))   

      .sortByKey(false)         // In descending order of payment

      .map(x => x._1).take(3)   // remove the first three

      .foreach(x => {

        num = num + 1

        println(num + "\t" + x)

      })

  } 

}

Copy the code
  • case 2
/ * *

* 1. Data set (movielensSet)

* user movie score data [UserID: : MovieID: : Rating: : Timestamp]

* movie name data [MovieId: : MovieName: : MovieType]

* 2. Requirement description

* Find movie titles with an average rating greater than 5

  *

* /


object MovieRating {

  def main(args: Array[String) :Unit = {

    val conf = new SparkConf().setAppName("MovieRating").setMaster("local")

    val sc = new SparkContext(conf)

    sc.setLogLevel("ERROR")

    / / the user movie score data [UserID: : MovieID: : Rating: : Timestamp]

    val userRating = sc.textFile("E://ml-1m/ratings.dat")

    / / movie name data [MovieId: : MovieName: : MovieType]

    val movies = sc.textFile("E://ml-1m/movies.dat")

    // retrieve MovieID and Rating,(MovieID, Rating)

    val movieRating = userRating.map { line => {

      val rating = line.split("... "")

      (rating(1).toInt, rating(2).toDouble)

    }

    }

    // compute the MovieId and its average rating,(MovieId,AvgRating)

    val movieAvgRating = movieRating

      .groupByKey()

      .map { rating =>

          val avgRating = rating._2.sum / rating._2.size

          (rating._1, avgRating)

      }

    // extract the MovieId and MovieName,(MovieId,MovieName)

   val movieName =  movies.map { movie =>

        val fields = movie.split("... "")

        (fields(0).toInt, fields(1))



    }.keyBy(_._1)



    movieAvgRating

      .keyBy(_._1)

      .join(movieName) // Join result (MovieID,(MovieID,AvgRating),(MovieID,MovieName))

      .filter(joinData => joinData._2._1._2 > 5.0)

      .map(rs => (rs._1,rs._2._1._2,rs._2._2._2))

      .saveAsTextFile("E:/MovieRating/")

  }



}

Copy the code

conclusion

This article explains Spark Core in detail, including basic concepts of RDD, RDD operators, shared variables, and holding plans. At last, it provides two complete Spark Core programming cases. The next article will share the Spark SQL programming guide.