Create DataFrame and Dataset

1.1 create DataFrame

The entry point for all the functions in Spark is SparkSession, which can be created using Sparksession.Builder (). Once created, the application can create dataframes from existing RDD, Hive table, or Spark data source. The following is an example:

val spark = SparkSession.builder().appName("Spark-SQL").master("local[2]").getOrCreate()
val df ="/usr/file/json/emp.json")

// It is recommended to import the following implicit transformations before starting spark SQL programming, as many operations in DataFrames and dataSets rely on implicit transformations
import spark.implicits._
You can use spark-shell for testing. Note that after spark-shell is started, a SparkSession named Spark is automatically created. You can reference it in the command line interface.

1.2 create a Dataset

Spark supports creating a DataSet from an internal DataSet and an external DataSet in the following ways:

1. Created from an external data set

// 1. Implicit conversions need to be imported
import spark.implicits._

// 2. Create case class, equivalent to Java Bean
case class Emp(ename: String, comm: Double, deptno: Long, empno: Long, hiredate: String, job: String, mgr: Long, sal: Double)

// create Datasets from external Datasets
val ds ="/usr/file/emp.json").as[Emp]
2. Created from an internal data set

// 1. Implicit conversions need to be imported
import spark.implicits._

// 2. Create case class, equivalent to Java Bean
case class Emp(ename: String, comm: Double, deptno: Long, empno: Long, hiredate: String, job: String, mgr: Long, sal: Double)

// create Datasets from internal Datasets
val caseClassDS = Seq(Emp("ALLEN".300.0.30.7499."The 1981-02-20 00:00:00"."SALESMAN".7698.1600.0),
                      Emp("JONES".300.0.30.7499."The 1981-02-20 00:00:00"."SALESMAN".7698.1600.0))
1.3 Creating a DataFrame by RDD

Spark supports two ways to convert an RDD to a DataFrame, using reflection inference and specifying Schema conversion:

1. Use reflex inference

// 1. Import implicit conversions
import spark.implicits._

// 2. Create department classes
case class Dept(deptno: Long, dname: String, loc: String)

// 3. Create the RDD and convert it to dataSet
val rddToDS = spark.sparkContext
  .map(line => Dept(line(0).trim.toLong, line(1), line(2)))
  .toDS()  // Convert to dataFrame if toDF() is called
2. Specify the Schema programmatically

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

// 1. Define the column type for each column
val fields = Array(StructField("deptno", LongType, nullable = true),
                   StructField("dname", StringType, nullable = true),
                   StructField("loc", StringType, nullable = true))

// 2. Create schema
val schema = StructType(fields)

Create an RDD
val deptRDD = spark.sparkContext.textFile("/usr/file/dept.txt")
val rowRDD ="\t")).map(line => Row(line(0).toLong, line(1), line(2)))

// 4. Convert RDD to dataFrame
val deptDF = spark.createDataFrame(rowRDD, schema)
1.4 Conversion between DataFrames and Datasets

Spark provides a very simple conversion method for converting DataFrame to Dataset, as shown in the following example:

#Turn DataFrames Datasets
res1: org.apache.spark.sql.Dataset[Emp] = [COMM: double, DEPTNO: bigint ... 6 more fields]

#Turn Datasets DataFrames
scala> ds.toDF()
res2: org.apache.spark.sql.DataFrame = [COMM: double, DEPTNO: bigint ... 6 more fields]
Columns operation

2.1 reference column

Spark supports a variety of ways to construct and reference columns, the simplest being using the col() or column() functions.


// For Scala, it is also possible to use $"myColumn" and 'myColumn "syntactic sugar for reference.$"ename", $"job").show()'ename, 'job).show()
2.2 the new column

// Add columns based on existing column values
df.withColumn("upSal", $"sal"+1000)
// Add columns based on fixed values
2.3 delete columns

// Multiple columns can be deleted
2.4 Renaming columns

New dataframes are created when columns are added, deleted, or renamed. The original DataFrame is not changed.

3. Use the Structured API for basic query

// 1. Query the employee's name and job$"ename", $"job").show()

// 2.filter Query information about employees whose salary is greater than 2000
df.filter($"sal" > 2000).show()

// 3. Order Derby queries by descending department number and ascending salary
df.orderBy(desc("deptno"), asc("sal")).show()

// 4. Limit query information about the top 3 employees

// 5. Distinct Queries the ids of all departments"deptno").distinct().show()

// 6. GroupBy Counts the number of department members
4. Use Spark SQL for basic query

4.1 Spark SQL Basic Usage

// 1. First you need to register the DataFrame as a temporary view

// 2. Query employee name and job
spark.sql("SELECT ename,job FROM emp").show()

// 3. Query the employee whose salary is greater than 2000
spark.sql("SELECT * FROM emp where sal > 2000").show()

// 4. Order Derby queries by descending department id and ascending salary
spark.sql("SELECT * FROM emp ORDER BY deptno DESC,sal ASC").show()

// 5. Limit query information about top 3 employees
spark.sql("SELECT * FROM emp ORDER BY sal DESC LIMIT 3").show()

// 6. Distinct Queries the ids of all departments
spark.sql("SELECT DISTINCT(deptno) FROM emp").show()

// 7. Count the number of department members by group
spark.sql("SELECT deptno,count(ename) FROM emp group by deptno").show()
4.2 Global Temporary View

The temporary session view created above with createOrReplaceTempView is limited to the lifetime of the session and terminates at the end of the session.

You can also use createGlobalTempView to create a global temporary view, which can be shared between all sessions and does not disappear until the entire Spark application terminates. Global temporary views are defined in the built-in Global_temp database and need to be referenced with qualified names, such as SELECT * FROM global_temp.view1.

// Register as a global temporary view

// Reference with qualified names
spark.sql("SELECT ename,job FROM global_temp.gemp").show()
