Spark SQL is the Spark module used for structured data processing. Unlike the basic Spark RDD API, Spark SQL provides an interface that gives Spark more information about the data structure and the calculations being performed. Internally, Spark SQL uses this additional information to perform additional optimizations. There are several ways to interact with Spark SQL, including SQL and the Dataset API. Use the same execution engine when calculating the result, regardless of the API/language you use to express the calculation. This uniformity means that developers can easily switch back and forth between different apis, providing the most natural way to express a given transformation.

One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. When you run SQL from another programming language, the result is returned as a dataset/data box. You can also use the command line or JDBC/ODBC to interact with the SQL interface.

Data sets and data frameworks

A data set is a distributed data set. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDD (strong typing, the ability to use powerful lambda functions) and Spark SQL optimization execution engine. Datasets can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, and so on).

Introduction to 1.

The entry point for all functions in Spark is the SparkSession class. To create a basic SparkSession, just use sparkSession.Builder () :

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option"."some-value")
  .getOrCreate();
Copy the code
1.1 create DataFrames

With SparkSession, applications can create dataframes from existing RDD, Hive tables, or Spark data sources.

Example of creating a DataFrame based on the contents of a JSON file:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Copy the code
1.2 Data set operation

A basic example of structured data processing for a dataset:

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema inA tree format(print metadata) df.printschema (); // root // |-- age: long (nullable =true)
// |-- name: string (nullable = true)

// Select only the "name"Df.select (df.select)"name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, But increment the age by 1 df_. select(col(), increment the age by 1"name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21"age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show(); (branch query: the column name age quantity statistics) / / + + + / / -- -- -- -- -- - | the age | count | / / + - + - + / / 19 | | 1 | / / | null | 1 | / / | | 1 | 30 / / + - + - +Copy the code
1.3 Query SQL programmatically
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Copy the code
1.4 Global Temporary View

The temporary view in Spark SQL is session-scoped and will disappear if the session that created it terminates. If you want to have a temporary view that is shared between all sessions and remains active until the Spark application terminates, you can create a global temporary view. The global temporary view is bound to the system-reserved database global_temp, and we must refer to it with qualified names, such as SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view object df.createGlobalTempView("people"); // Global temporary view is tied to a system preserved database 'global_temp'"SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Copy the code
1.5 Creating data Sets

Datasets are similar to RDD, but instead of using Java serialization or Kryo, they use specialized encoders to serialize objects for processing or transmission over a network. While encoders and standard serialization are both responsible for converting objects into bytes, encoders are dynamically generated code and use a format that allows Spark to perform many operations, such as filtering, sorting, and hashing, without deserializing bytes into objects.

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created forEncoder<Person> personEncoder = encoders.bean (person.class); Dataset<Person> javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder ); javaBeanDS.show(); // +---+----+ // |age|name| // +---+----+ // | 32|Andy| // +---+----+ // Encodersfor most common types are provided inClass Encoders () Encoder<Integer>integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
    (MapFunction<Integer, Integer>) value -> value + 1,
    integerEncoder); transformedDS.collect(); // Returns [2, 3, 4] // DataFrames can be converted to a Dataset by providing a class. Mapping based on Name (DataFrames can convert a class to a dataset based on the name of the mapping) String path ="examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Copy the code
1.6 Interaction with the RDD

Spark SQL supports two different ways to transform an existing RDD into a dataset.

The first approach uses reflection to infer the pattern of an RDD containing an object of a particular type. This reflection-based approach provides cleaner code.

The second approach is through a programming interface that allows you to build patterns and then apply them to an existing RDD.

1.6.1 Using Reflection Mode

Spark SQL supports automatic conversion of JavaBeans RDD to DataFrame. The BeanInfo obtained using reflection defines the schema of the table. Currently, Spark SQL does not support Javabeans that contain Map fields. However, nested JavaBeans and List or Array fields are supported. You can create Javabeans and set getters and setters for all their fields by creating a class that implements Serializable.

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(line -> {
    String[] parts = line.split(",");
    Person person = new Person();
    person.setName(parts[0]);
    person.setAge(Integer.parseInt(parts[1].trim()));
    return person;
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
    stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
Copy the code
1.6.2 Programming mode mode

If a JavaBean class cannot be defined in advance (for example, the structure of the record is encoded as a string, or the text Dataset will be parsed and the fields will be projected differently for different users), there are three steps to programmatically create a Dataset.

  1. Create row RDD from original RDD;
  2. Create a pattern represented by a StructType that matches the row structure in the RDD created in Step 1.
  3. The schema is applied to the RDD of the row through the createDataFrame method provided by SparkSession.
import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; // Create an RDD JavaRDD<String> peopleRDD = spark.sparkContext().textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split("")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
  String[] attributes = record.split(",");
  return RowFactory.create(attributes[0], attributes[1].trim());
});

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
Copy the code
1.7 the aggregation

The built-in DataFrames functions provide common aggregations such as count (), countDistinct (), AVg (), Max (), min (), and so on. Although these functions are designed for DataFrames, there are type-safe versions of Spark SQL, some of which use strongly typed datasets in Scala and Java. In addition, users are not limited to predefined aggregation functions and can create their own.

1.7.1 No user-defined aggregation function

Users must extend UserDefinedAggregateFunction abstract classes in order to realize the custom type aggregation function.

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public static class MyAverage extends UserDefinedAggregateFunction {

  private StructType inputSchema;
  private StructType bufferSchema;

  public MyAverage() {
    List<StructField> inputFields = new ArrayList<>();
    inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
    inputSchema = DataTypes.createStructType(inputFields);

    List<StructField> bufferFields = new ArrayList<>();
    bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
    bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
    bufferSchema = DataTypes.createStructType(bufferFields);
  }
  // Data types of input arguments of this aggregate function
  public StructType inputSchema() {
    return inputSchema;
  }
  // Data types of values in the aggregation buffer
  public StructType bufferSchema() {
    return bufferSchema;
  }
  // The data type of the returned value
  public DataType dataType() {
    return DataTypes.DoubleType;
  }
  // Whether this function always returns the same output on the identical input
  public boolean deterministic() {
    return true;
  }
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, 0L);
    buffer.update(1, 0L);
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  public void update(MutableAggregationBuffer buffer, Row input) {
    if(! input.isNullAt(0)) { long updatedSum = buffer.getLong(0) + input.getLong(0); long updatedCount = buffer.getLong(1) + 1; buffer.update(0, updatedSum); buffer.update(1, updatedCount); } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` public void merge(MutableAggregationBuffer buffer1, Row buffer2) { long mergedSum = buffer1.getLong(0) + buffer2.getLong(0); long mergedCount = buffer1.getLong(1) + buffer2.getLong(1); buffer1.update(0, mergedSum); buffer1.update(1, mergedCount); } // Calculates the final result public Double evaluate(Row buffer) {return ((double) buffer.getLong(0)) / buffer.getLong(1);
  }
}

// Register the function to access it
spark.udf().register("myAverage", new MyAverage());

Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees"); result.show(); / / + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + / / | average_salary | / / + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + / / | | / 3750.0 / + -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code
1.7.2 User-defined aggregation functions

User-defined aggregation of strongly typed datasets revolves around the Aggregator abstract class.

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;

public static class Employee implements Serializable {
  private String name;
  private long salary;

  // Constructors, getters, setters...

}

public static class Average implements Serializable  {
  private long sum;
  private long count;

  // Constructors, getters, setters...

}

public static class MyAverage extends Aggregator<Employee, Average, Double> {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  }
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Employee employee) {
    long newSum = buffer.getSum() + employee.getSalary();
    long newCount = buffer.getCount() + 1;
    buffer.setSum(newSum);
    buffer.setCount(newCount);
    return buffer;
  }
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    b1.setSum(mergedSum);
    b1.setCount(mergedCount);
    return b1;
  }
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  }
  // Specifies the Encoder for the intermediate value type
  public Encoder<Average> bufferEncoder() {
    return Encoders.bean(Average.class);
  }
  // Specifies the Encoder for the final output value type
  public Encoder<Double> outputEncoder() {
    return Encoders.DOUBLE();
  }
}

Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary"); Dataset<Double> result = ds.select(averageSalary); result.show(); / / + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + / / | average_salary | / / + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + / / | | / 3750.0 / + -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code

Articles are from the official documentation of the first quarter, data sources,] performance maintenance] (spark.apache.org/docs/2.3.1/…). Distributed SQL engine please refer to the document.