Create DataFrame and Dataset

1.1 create DataFrame

The entry point for all the functions in Spark is SparkSession, which can be created using Sparksession.Builder (). Once created, the application can create dataframes from existing RDD, Hive table, or Spark data source. The following is an example:

val spark = SparkSession.builder().appName("Spark-SQL").master("local[2]").getOrCreate()
val df = spark.read.json("/usr/file/json/emp.json")
df.show()

// It is recommended to import the following implicit transformations before starting spark SQL programming, as many operations in DataFrames and dataSets rely on implicit transformations
import spark.implicits._
Copy the code

You can use spark-shell for testing. Note that after spark-shell is started, a SparkSession named Spark is automatically created. You can reference it in the command line interface.


1.2 create a Dataset

Spark supports creating a DataSet from an internal DataSet and an external DataSet in the following ways:

1. Created from an external data set

// 1. Implicit conversions need to be imported
import spark.implicits._

// 2. Create case class, equivalent to Java Bean
case class Emp(ename: String, comm: Double, deptno: Long, empno: Long, hiredate: String, job: String, mgr: Long, sal: Double)

// create Datasets from external Datasets
val ds = spark.read.json("/usr/file/emp.json").as[Emp]
ds.show()
Copy the code

2. Created from an internal data set

// 1. Implicit conversions need to be imported
import spark.implicits._

// 2. Create case class, equivalent to Java Bean
case class Emp(ename: String, comm: Double, deptno: Long, empno: Long, hiredate: String, job: String, mgr: Long, sal: Double)

// create Datasets from internal Datasets
val caseClassDS = Seq(Emp("ALLEN".300.0.30.7499."The 1981-02-20 00:00:00"."SALESMAN".7698.1600.0),
                      Emp("JONES".300.0.30.7499."The 1981-02-20 00:00:00"."SALESMAN".7698.1600.0))
                    .toDS()
caseClassDS.show()
Copy the code


1.3 Creating a DataFrame by RDD

Spark supports two ways to convert an RDD to a DataFrame, using reflection inference and specifying Schema conversion:

1. Use reflex inference

// 1. Import implicit conversions
import spark.implicits._

// 2. Create department classes
case class Dept(deptno: Long, dname: String, loc: String)

// 3. Create the RDD and convert it to dataSet
val rddToDS = spark.sparkContext
  .textFile("/usr/file/dept.txt")
  .map(_.split("\t"))
  .map(line => Dept(line(0).trim.toLong, line(1), line(2)))
  .toDS()  // Convert to dataFrame if toDF() is called
Copy the code

2. Specify the Schema programmatically

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._


// 1. Define the column type for each column
val fields = Array(StructField("deptno", LongType, nullable = true),
                   StructField("dname", StringType, nullable = true),
                   StructField("loc", StringType, nullable = true))

// 2. Create schema
val schema = StructType(fields)

Create an RDD
val deptRDD = spark.sparkContext.textFile("/usr/file/dept.txt")
val rowRDD = deptRDD.map(_.split("\t")).map(line => Row(line(0).toLong, line(1), line(2)))


// 4. Convert RDD to dataFrame
val deptDF = spark.createDataFrame(rowRDD, schema)
deptDF.show()
Copy the code


1.4 Conversion between DataFrames and Datasets

Spark provides a very simple conversion method for converting DataFrame to Dataset, as shown in the following example:

#Turn DataFrames Datasets
scala> df.as[Emp]
res1: org.apache.spark.sql.Dataset[Emp] = [COMM: double, DEPTNO: bigint ... 6 more fields]

#Turn Datasets DataFrames
scala> ds.toDF()
res2: org.apache.spark.sql.DataFrame = [COMM: double, DEPTNO: bigint ... 6 more fields]
Copy the code


Columns operation

2.1 reference column

Spark supports a variety of ways to construct and reference columns, the simplest being using the col() or column() functions.

col("colName")
column("colName")

// For Scala, it is also possible to use $"myColumn" and 'myColumn "syntactic sugar for reference.
df.select($"ename", $"job").show()
df.select('ename, 'job).show()
Copy the code

2.2 the new column

// Add columns based on existing column values
df.withColumn("upSal", $"sal"+1000)
// Add columns based on fixed values
df.withColumn("intCol",lit(1000))
Copy the code

2.3 delete columns

// Multiple columns can be deleted
df.drop("comm"."job").show()
Copy the code

2.4 Renaming columns

df.withColumnRenamed("comm"."common").show()
Copy the code

New dataframes are created when columns are added, deleted, or renamed. The original DataFrame is not changed.


3. Use the Structured API for basic query

// 1. Query the employee's name and job
df.select($"ename", $"job").show()

// 2.filter Query information about employees whose salary is greater than 2000
df.filter($"sal" > 2000).show()

// 3. Order Derby queries by descending department number and ascending salary
df.orderBy(desc("deptno"), asc("sal")).show()

// 4. Limit query information about the top 3 employees
df.orderBy(desc("sal")).limit(3).show()

// 5. Distinct Queries the ids of all departments
df.select("deptno").distinct().show()

// 6. GroupBy Counts the number of department members
df.groupBy("deptno").count().show()
Copy the code


4. Use Spark SQL for basic query

4.1 Spark SQL Basic Usage

// 1. First you need to register the DataFrame as a temporary view
df.createOrReplaceTempView("emp")

// 2. Query employee name and job
spark.sql("SELECT ename,job FROM emp").show()

// 3. Query the employee whose salary is greater than 2000
spark.sql("SELECT * FROM emp where sal > 2000").show()

// 4. Order Derby queries by descending department id and ascending salary
spark.sql("SELECT * FROM emp ORDER BY deptno DESC,sal ASC").show()

// 5. Limit query information about top 3 employees
spark.sql("SELECT * FROM emp ORDER BY sal DESC LIMIT 3").show()

// 6. Distinct Queries the ids of all departments
spark.sql("SELECT DISTINCT(deptno) FROM emp").show()

// 7. Count the number of department members by group
spark.sql("SELECT deptno,count(ename) FROM emp group by deptno").show()
Copy the code

4.2 Global Temporary View

The temporary session view created above with createOrReplaceTempView is limited to the lifetime of the session and terminates at the end of the session.

You can also use createGlobalTempView to create a global temporary view, which can be shared between all sessions and does not disappear until the entire Spark application terminates. Global temporary views are defined in the built-in Global_temp database and need to be referenced with qualified names, such as SELECT * FROM global_temp.view1.

// Register as a global temporary view
df.createGlobalTempView("gemp")

// Reference with qualified names
spark.sql("SELECT ename,job FROM global_temp.gemp").show()
Copy the code

The resources

Spark SQL, DataFrames and Datasets Guide > Getting Started

See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series