(1) Overview

SparkSQL can be considered as a layer of encapsulation on the native RDD. With SparkSQL, YOU can write SQL statements in Scala and Java and return the results as a Dataset/DataFrame. In a nutshell, SparkSQL lets you process data in memory just like you write SQL.

Dataset is a distributed collection of data and an interface added after Spark1.6. It provides the advantages of RDD and SparkSQL optimization execution engine. A Dataset is equivalent to the combination of RDD+Schema.

The underlying wrapper of a Dataset is an RDD, which is called a DataFrame when the generic type of an RDD is a Row type. DataFrame is a tabular data structure, just like the traditional Mysql structure, with DataFrame we can execute Sql more efficiently.

(2) SparkSQL combat

Using SparkSQL first requires the introduction of related dependencies:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>The spark - sql_2. 12</artifactId>
    <version>3.0.0</version>
</dependency>
Copy the code

This dependency needs to be consistent with sparkCore.

SparkSQL is encoded in four main steps:

  1. Create SparkSession
  2. To get the data
  3. Execute SQL
  4. Close the SparkSession
public class SqlTest {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
        Dataset<Row> json = sparkSession.read().json("data/json"); json.printSchema(); json.show(); sparkSession.stop(); }}Copy the code

Create a file named JSON in the data directory

{"name":"a"."age":23}
{"name":"b"."age":24}
{"name":"c"."age":25}
{"name":"d"."age":26}
{"name":"e"."age":27}
{"name":"f"."age":28}
Copy the code

After running the project, two results are output. The schema results are as follows:

Dataset

outputs the following result:

SparkSQL allows you to perform very similar query operations to SQL:

public class SqlTest {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
        Dataset<Row> json = sparkSession.read().json("data/json");
        json.select("age"."name").where("age > 26").show(); sparkSession.stop(); }}Copy the code

In the above statement, SQL query operations are implemented through a series of apis, but SparkSQL also supports writing raw SQL statements directly.

Before writing an SQL statement, Spark needs to know which table to query. Therefore, create a temporary table and execute the SQL query:

json.createOrReplaceTempView("json");
sparkSession.sql("select * from json where age > 26").show();
Copy the code

(3) Create Dataset in non-JSON format

In the previous section, we used the simplest JSON to create the Dataset. Because JSON has its own schema structure, we do not need to manually add it. If it is a TXT file, we need to manually insert the schema when creating the Dataset.

Here is an example of reading a TXT file, first creating a user.txt

a 23
b 24
c 25
d 26
Copy the code

Now I want to make the top rows a DataFrame, with the first column representing the name and the second column representing the age, so I can do something like this:

public class SqlTest2 {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
        SparkContext sparkContext = sparkSession.sparkContext();
        JavaSparkContext sc = new JavaSparkContext(sparkContext);
        JavaRDD<String> lines = sc.textFile("data/user.txt");
        // Convert String to Row
        JavaRDD<Row> rowJavaRDD = lines.map(new Function<String, Row>() {
            @Override
            public Row call(String v1) throws Exception {
                String[] split = v1.split("");
                return RowFactory.create(
                        split[0],
                        Integer.valueOf(split[1])); }});/ / define schema
        List<StructField> structFields = Arrays.asList(
                DataTypes.createStructField("name", DataTypes.StringType, true),
                DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields);/ / generated dataFrameDataset<Row> dataFrame = sparkSession.createDataFrame(rowJavaRDD, structType); dataFrame.show(); }}Copy the code

Create a DataFrame using JDBC

Using JDBC, you can directly add tables in the database to Spark for processing. The following example uses MySQL. To use MySQL, you need to import the MySQL engine in your dependencies:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.46</version>
</dependency>
Copy the code

Then read MySQL data in a JDBC-like manner:

public class SqlTest3 {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
        Map<String,String> options = new HashMap<>();
        options.put("url"."JDBC: mysql: / / 127.0.0.1:3306 / books");
        options.put("driver"."com.mysql.jdbc.Driver");
        options.put("user"."root");
        options.put("password"."123456");
        options.put("dbtable"."book");
        Dataset<Row> jdbc = sparkSession.read().format("jdbc").options(options).load(); jdbc.show(); sparkSession.close(); }}Copy the code

The data read is a DataFrame, and the next operation is on the DataFrame.

(5) Summary

SparkSQL is an enhancement of Spark’s native RDD. Although many functions can be implemented using RDD, SparkSQL can flexibly implement some functions. I’m fish boy, and I’ll see you next time.