The spark of combineByKey

The characteristics of combineByKey

The power of combineByKey is that it provides three function operations to manipulate a function. The first function processes the metadata to obtain a key-value pair. The second function performs a one-to-one operation on key-value key-value pairs, that is, one key-value pair for each output, and in this case, the integration is based on key. The third function operates on key-value pairs with the same key, kind of like reduceByKey, but the actual implementation is quite different.

In Spark Starter (5) — Reduce and reduceByKey of Spark, we used Reduce to calculate the average value. With combineByKey we can do much richer things than the average. You now have a data set with each row of data containing an A-Z letter and an integer separated by a space. Now we want the average of each letter. The scenario is a bit like that of multiple students, each with multiple grades, and the average score of the students. But here we simplify the problem by putting the data set in grades. The dataset, as well as the code below, can be downloaded on Github.

CombineByKey calculates multiple average values

Scala implementation


import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.{SparkConf, SparkContext}

object SparkCombineByKey {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")

    val sc = new SparkContext(conf)

    sc.textFile("./grades").map(line=>{
      val splits = line.split("")
      (splits(0),splits(1).toInt)
    }).combineByKey(
      value => (value,1),
      (x:(Int,Int),y)=>(x._1+y,x._2+1),
      (x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
    ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)

  }

}
Copy the code

Scala results

(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)

Copy the code

Java implementation:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.sources.In;
import scala.Tuple2;

public class SparkCombineByKeyJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKeyJava");

        JavaSparkContext sc = new JavaSparkContext(conf);

        combineByKeyJava(sc);

        combineByKeyJava8(sc);


    }


    public static void combineByKeyJava(JavaSparkContext sc){

        JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] splits = s.split("");
                returnnew Tuple2<>(splits[0],Integer.parseInt(splits[1])); }}); splitData.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Integerinteger) throws Exception {
                return new Tuple2<>(integer, 1);
            }
        }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Integer integer) throws Exception {
                return new Tuple2<>(integerIntegerTuple2._1 + integer.integerIntegerTuple2._2 + 1);
            }
        }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception {
                return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1,integerIntegerTuple2._2+integerIntegerTuple22._2);
            }
        }).map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Tuple2<String,Double>>() {
            @Override
            public Tuple2<String,Double> call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
                returnNew Tuple2 < > (stringTuple2Tuple2. _1, stringTuple2Tuple2) _2) _1 * 1.0 / stringTuple2Tuple2 _2) _2); } }).foreach(new VoidFunction<Tuple2<String, Double>>() { @Override public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception { System.out.println(stringDoubleTuple2._1+""+stringDoubleTuple2._2); }}); } public static void combineByKeyJava8(JavaSparkContext sc){ JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(line -> {
            String[] splits = line.split("");
            returnnew Tuple2<>(splits[0],Integer.parseInt(splits[1])); }); splitData.combineByKey( x->new Tuple2<>(x,1), (x,y)->new Tuple2<>(x._1+y,x._2+1), (x,y)->new Tuple2<>(x._1+y._1,x._2+y._2) ).map(x->new Tuple2 (x) _1, x) _2) _1 * 1.0 / x) _2) _2)) foreach (x - > System. Out. The println (x. _1 +""+x._2)); }}Copy the code

Java Runtime results

D 338451.6e 335306.7480769231 a 336184.95321637427 I 346279.497029703 b 333069.8589473684 h 334343.75f 341380.9444444444444j 320145.7618069815g 334042.37605042016 c 325022.4183673469Copy the code

Analysis of the

Before we start python, let’s look at Java and Scala. We found java7’s code to be very redundant, while Java8 is very clean compared to Scala. Of course, it’s hard to say whether it’s good or bad, but it’s also a sign that contemporary language is beginning to shift away from being more complex. This is especially true when it comes to Python.

But we’re not just talking about the language, we’re looking at the way the average is implemented, and because Java does a number manipulation, it has a decimal reservation that Scala doesn’t, but at least the result is the same. Of course, that’s not the point. The point is, this combinByKey is very complicated, it has three functions. It’s hard to see what each process does. So here we take the scala program a step further and see what combineByKey does.

Scala modify


import org.apache.spark.{SparkConf, SparkContext}

object SparkCombineByKey {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")

    val sc = new SparkContext(conf)

    sc.textFile("./grades").map(line=>{
      val splits = line.split("")
      (splits(0),splits(1).toInt)
    }).combineByKey(
      value => {
        println("This is the first function.")
        println("Loop through all the values and place them in a tuple, mark 1.")
        println(value)
        (value,1)
      },
      (x:(Int,Int),y)=>{
        println("This is the second function.")
        println("Add the first value of x, add the second value by one, and find the total number of elements.")
        println("x:"+x.toString())
        println("y:"+y)
        (x._1+y,x._2+1)
      },
      (x:(Int,Int),y:(Int,Int))=>{
        (x._1+y._1,x._2+y._2)
      }
    ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)

  }

}

Copy the code

results

So this is the first function that iterates through all of the values and puts them in a tuple, labeled 1 222783, labeled 1 48364, this is the first function that iterates through all of the values and puts them in a tuple, labeled 1 204950, And placed in a tuple labeled 1 261777... . . This is the second function that takes the sum of the first value in x, and adds one to the second value to get the total number of elements x:(554,875,2) y: 357,748. This is the second function that adds one to the first value in x, So let's get the total number of elements x:(912623,3) y:202407 so this is the first function that iterates over all the values and puts them in a tuple, labeled 1 48608 and this is the second function that sums the first value in x, the second value adds one, Find the total number of elements x:(1115030,4) y:69003 this is the first function to iterate over all the values and place them in a tuple labeled 1 476893... . . (d,338451) (e,335306) (a,336184) (i,346279) (b,333069) (h,334343) (f,341380) (j,320145) (g,334042) (c,325022)Copy the code

Here we see that the order of functions is not to execute the first function first and then the second function. Instead, partition parallelism means that the first partition executes the first function, does not wait for the other partitions to complete the first function, but executes the second function immediately after it, and finally processes the third function. In a local single machine, this parallel feature is not fully utilized, but in a cluster environment, each partition is calculated on different nodes, and then the results are summarized and processed. In this way, when the data volume is very large, the more nodes in the cluster, the more obvious the advantage.

There’s another feature that’s really interesting when we take out foreach(println)

foreach(println)
Copy the code

We run the program and find that it has no output. This is due to the lazy loading characteristics of Spark. Spark performs calculations only when performing specific operations on data, such as output and saving. This may seem unreasonable, but in fact it can greatly improve efficiency in many scenarios, but if not handled properly, it can cause Spark to start calculating the process from scratch every time it executes an operation. Therefore, when the result of an operation needs to be called frequently or multiple times, we should save the result.

Python implementation

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")

sc = SparkContext(conf=conf)

sc.textFile("./grades")\
    .map(lambda line : (line.split("")[0],int(line.split("")[1])))\
    .combineByKey(
    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1])).foreach(print)
Copy the code

results

('b'333069.8589473684), ('f'341380.94444444444), ('j'320145.7618069815), ('h'334343.75), ('a'336184.95321637427), ('g'334042.37605042016), ('d'338451.6), ('e'335306.7480769231), ('c', 325022.4183673469)
Copy the code

The spark of sortByKey

Sort by sortByKey

SortByKey is very simple and very common. The above text is still used here, and the processed results are sorted to get the letter with the largest average value. In practice we can view it as sort by grade, or sort by name.

Scala implementation

import org.apache.spark.{SparkConf, SparkContext}

object SparkSortByKey {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")

    val sc = new SparkContext(conf)


    val result = sc.textFile("./grades").map(line=>{
      val splits = line.split("") (splits(0),splits(1).toInt) }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2) ). The map (x = > (x) _1, x) _2) _1 / x. _2) _2)) / / sorted by name, order result. SortByKey (true).foreach(println) // Sort by name, invert resultByKey (false).foreach(println)


    val result1 = sc.textFile("./grades").map(line=>{
      val splits = line.split("") (splits(0),splits(1).toInt) }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2) ). The map (x = > (x) _2. _1 / x) _2) _2, x. _1)) / / sorted by grade, order result1. SortByKey (true).foreach(println) // Result1.sortByKey (false).foreach(println)


  }

}
Copy the code

Python implementation

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")

sc = SparkContext(conf=conf)

result = sc.textFile("./grades")\
    .map(lambda line : (line.split("")[0],int(line.split("")[1])))\
    .combineByKey(
    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1]))

result.sortByKey(True).foreach(print)

result.sortByKey(False).foreach(print)


result1 = sc.textFile("./grades")\
    .map(lambda line : (line.split("")[0],int(line.split("")[1])))\
    .combineByKey(
    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[1][0]/x[1][1],x[0]))

result1.sortByKey(True).foreach(print)

result1.sortByKey(False).foreach(print)
Copy the code

results

(a,336184) (b,333069) (c,325022) (d,338451) (e,335306) (f,341380) (g,334042) (h,334343) (i,346279) (j,320145) (j,320145)  (i,346279) (h,334343) (g,334042) (f,341380) (e,335306) (d,338451) (c,325022) (b,333069) (a,336184) (320145,j) (325022,c) (333069,b) (334042,g) (334343,h) (335306,e) (336184,a) (338451,d) (341380,f) (346279,i) (346279,i) (341380,f)  (338451,d) (336184,a) (335306,e) (334343,h) (334042,g) (333069,b) (325022,c) (320145,j)Copy the code

The data sets and code are available for download on Github.