The data source

Spark SQL supports running on various data sources through the DataFrame interface. Data frames can be manipulated using relational transformations or used to create temporary views. Registering a data frame as a temporary view allows SQL queries to be run against its data. This section describes a general approach to loading and saving data using Spark data sources, followed by specific options available for built-in data sources.

Load and save functions

In its simplest form, the default data source (parquet, unless by spark. SQL. Sources. The default configuration) will be used for all operations.

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name"."favorite_color").write.save("namesAndFavColors.parquet")
Copy the code

Manually Specifying Options

You can also manually specify the data source to be used and any additional options you wish to pass to the data source. The data source is appointed by their fully qualified name (for example, org. Apache. Spark. SQL. Parquet), but for the built-in data source, you can also use the short name (json, parquet, JDBC, orc, libsvm, CSV, text). DataFrames loaded from any data source type can be converted to another type using this syntax.

Please refer to the API documentation for the built-in source available options, for example: org. Apache. The spark. SQL. Dataframerreader and org., apache. Spark. SQL. DataFrameWriter. The options in the documentation should also apply to non-Scala Spark apis (such as PySpark). For other formats, refer to the API documentation for that particular format.

To load the JSON file, you can use the

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name"."age").write.format("parquet").save("namesAndAges.parquet")
Copy the code

To load the CSV file, you can use

val peopleDFCsv = spark.read.format("csv")
  .option("sep".";")
  .option("inferSchema"."true")
  .option("header"."true")
  .load("examples/src/main/resources/people.csv")
Copy the code

Additional options are used during write operations. For example, you can control Bloom filters and dictionary encoding for ORC data sources. The following ORC example creates a Bloom filter and uses dictionary encoding only for Favourite_color. For Parquet, Parquet also exist. The enable. The dictionary. For more details on other ORC/Parquet options, visit the Apache ORC/Parquet official website.

usersDF.write.format("orc")
  .option("orc.bloom.filter.columns"."favorite_color")
  .option("orc.dictionary.key.threshold"."1.0")
  .option("orc.column.encoding.direct"."name")
  .save("users_with_options.orc")
Copy the code

Run SQL on files directly

Instead of using the READ API to load a file into the data framework and query it, you can query the file directly using SQL.

val sqlDF: DataFrame = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Copy the code

Save the model

The save operation can optionally be SaveMode, which specifies how to handle existing data. It is important to realize that these save modes do not use any locking and are not atomic. In addition, when an overwrite is performed, the data is deleted before new data is written.

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) "error" or "errorifexists" (default) When a data frame is saved to a data source, an exception is expected to be thrown if the data already exists.
SaveMode.Append “append” When saving a DataFrame to a data source, if data/tables already exist, the contents of the DataFrame should be appended to the existing data.
SaveMode.Overwrite “overwrite” Overwrite mode means that when a DataFrame is saved to a data source, if data/tables already exist, the existing data is expected to be overwritten by the contents of the DataFrame.
SaveMode.Ignore “ignore” The ignore mode means that when a data frame is saved to the data source, if the data already exists, the save operation does not save the contents of the data frame and does not alter the existing data. This is similar to the CREATE table that does not exist in SQL.

Saving to Persistent Tables

DataFrames can also be saved as persistent tables in Hive MetaStore by running the saveAsTable command. Notice You do not need to use the existing Hive deployment. Spark will create a default local Hive Metastore for you (using Derby). Unlike createOrReplaceTempView, saveAsTable implements the contents of the DataFrame and creates a pointer to the Hive MetaStore data. Even after the Spark program is restarted, persistent tables still exist, as long as you maintain a connection to the same meta-store. You can create a persistent table data frame

For file-based data sources such as Text, Parquet, JSON, etc. You can specify a custom table path df.write.option(“path”, “/some/path”).saveastable (“t”) with the path option. When a table is deleted, the custom table path is not deleted and the table data still exists. If no custom table path is specified, Spark writes data to the default table path in the warehouse directory. When a table is deleted, the default table path is also deleted.

Starting with Spark 2.1, each partition metadata of a persistent data source table is stored in Hive MetaStore. This brings several benefits

  • Since MetaStore can only return the necessary partitions for the query, there is no longer a need to discover all partitions of the table in the first query.
  • Such as: the ALTER TABLE PARTITION… SET LOCATION can now be used for tables created using the data source API.

(storage, Sorting or Partitioning) Bucketing

For file-based data sources, the output can also be stored, sorted, or partitioned. Store and sort only apply to persistent tables

peopleDF.write.bucketBy(42."name").sortBy("age").saveAsTable("people_bucketed")
Copy the code

When using the dataset API, partitions can be used with Save and saveAsTable.

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
Copy the code

You can use both partitioning and storage for a single table

usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42."name")
  .saveAsTable("users_partitioned_bucketed")
Copy the code

PartitionBy creates the directory structure described in the partition Discovery section. Therefore, its applicability to columns with high cardinality is limited. BucketBy, by contrast, distributes data across a fixed number of buckets and can be used with no limit to the number of unique values.

Operation file

Note that the directory hierarchy used in the following example is

The dir1-name / ├ ─ ─ dir2 / │ └ ─ ─ file2. Parquet (schema: < file: string >, content: "file2. Parquet") └ ─ ─ file1. Parquet (schema: └─ garbage (<file, string>, content: "{'file':'corrupt. Json '}")Copy the code

Ignore corrupted files

Spark allows you to ignore corrupted files when reading data from files using spark.sql.files.ignoRecorruptFiles. When set to true, the Spark job will continue to run when corrupt files are encountered and will still return what was read.

To ignore corrupted files when reading data files, you can use the

// enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF = spark.read.parquet(
  "examples/src/main/resources/dir1/"."examples/src/main/resources/dir1/dir2/")
testCorruptDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Copy the code

Ignore missing file click

Spark allows you to ignoremissingfiles when reading data from files using spark.sql.files.ignoremissingfiles. Here, missing files are actually files that are deleted from the directory after the data frame is constructed. When set to true, the Spark job will continue to run when missing files are encountered and will still return what was read.

File filter

PathGlobFilter is only used to contain files whose file names match the pattern. Grammar follow org. Apache. Hadoop. Fs. GlobFilter. It does not change the behavior of partition discovery. To load a file whose path matches a given glob pattern, while preserving partition discovery behavior, you can use

val testGlobFilterDF = spark.read.format("parquet")
  .option("pathGlobFilter"."*.parquet") // json file should be filtered out
  .load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
Copy the code

Recursive file lookup

RecursiveFileLookup is used to recursively load files and disables partition inference. The default value is false. If the data source explicitly specifies the partitionSpec when recursiveFileLookup is true, an exception is thrown.

val recursiveLoadedDF = spark.read.format("parquet")
  .option("recursiveFileLookup"."true")
  .load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Copy the code

Modification Time Path Filters

ModifiedBefore and modifiedAfter are options that can be applied together or separately to achieve greater file loading granularity during Spark batch queries. (Note that structured streaming file sources do not support these options.)

When no timezone option is provided, the timestamp is interpreted based on the Spark session timezone (spark.sql.session.timezone).

val beforeFilterDF = spark.read.format("parquet")
  // Files modified before 07/01/2020 at 05:30 are allowed
  .option("modifiedBefore"."2020-07-01T05:30:00")
  .load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
   // Files modified after 06/01/2020 at 05:30 are allowed
  .option("modifiedAfter"."2020-06-01T05:30:00")
  .load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+
Copy the code

Parquet file

Parquet is a column format supported by many other data processing systems. Spark SQL provides support for reading and writing Parquet files that automatically save raw data schemas. When the Parquet file is read, all columns are automatically converted to nullable for compatibility reasons.

Load the data

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Copy the code

partition

Table partitioning is a common optimization method used in Hive and other systems. In partitioned tables, data is typically stored in different directories, and partitioned column values are encoded in the path of each partitioned directory. All built-in file source (o Text/CSV/JSON/ORC/Parquet) can automatically discover and infer that partition information, for example, we can use the following directory structure will all before using population data storage to a partition table, and add two additional columns, Gender and country are the partition columns

└── ── ── ── ── ── ── ── ─ │ │ │ ├ ─ ─ country = US │ │ └ ─ ─ data. The parquet │ ├ ─ ─ country = CN │ │ └ ─ ─ data. The parquet │ └ ─ ─... └ ─ ─ gender = female ├ ─ ─... │ ├ ─ ─ country = US │ └ ─ ─ data. The parquet ├ ─ ─ country = CN │ └ ─ ─ data. The parquet └ ─ ─...Copy the code

By passing path/to/table to Sparkession.read. Parquet or sparkession.read. Load, Spark SQL will automatically extract partition information from the path. The returned DataFrame is now in the following mode:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
Copy the code

Note that the data type of the partitioned column is automatically inferred. Currently, numeric data types, dates, timestamps, and string types are supported. Sometimes users may not want to automatically infer the data types of partitioned columns. For these cases, can spark. SQL. Sourcees. Partitioncolumntypeinference. Enabled configuration automatically type inference, the default is true. When type inference is disabled, string types are used for partitioned columns.

Starting with Spark 1.6.0, partition discovery can only be found in a given path by default. For the example above, if the user passes path/to/table/gender = male to sparkession.read.parquet or sparkession.read.load, gender is not considered a partitioned column. If users need to specify which basePath partition discovery should start from, they can set basePath in the data source option. For example, when path/to/table/gender = male is the path of the data and the user sets basePath to path/to/table/, gender will be a partitioned column.

Schema Merging

Like Protocol Buffer, Avro, and Thrift, Parquet supports pattern evolution. Users can start with a simple schema and gradually add more columns to the schema as needed. In this way, users might end up with multiple Parquet files with different but compatible schemas. The Parquet data source can now automatically detect this and merge the schema of all these files.

Since schema merging is a relatively expensive operation and is not required in most cases, we turned it off by default starting with 1.5.0. You can enable it by:

  1. setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or
  2. setting the global SQL option spark.sql.parquet.mergeSchema to true.
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value"."square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value"."cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema"."true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
Copy the code

Hive metastore Parquet table conversion

When reading data from the Hive MetabStore Parquet table and writing to an unpartitioned Hive MetabStore Parquet table, Spark SQL attempts to use its own Parquet support instead of Hive SerDe, For better performance. The trip for spark. SQL. Hive. Convertmetastoreppatquet configuration control, by default is turned on.

Hive/Parquet Schema Reconciliation Hive/Parquet

There are two key differences between Hive and Parquet from a table schema processing perspective.

1, Hive is case insensitive, while Parquet is not

2, Hive promotion all columns nullable, while nullability in Parquet is significant Hive

For this reason, we have to reconcile the Hive metamodeled Parquet schema with the Parquet schema when converting the Hive metamodeled Parquet table to the Spark SQL Parquet table. The regulation rules are as follows:

1. Fields with the same name in both schemas must have the same data type, regardless of whether they are null. Conciliated fields should have data types on the Parquet side so that nullity can be considered.

2. The coordinated Parquet contains exactly the fields defined in the Hive meta-store Parquet.

Any fields that only appear in Parquet mode will be removed to conciled mode

Any fields that only appear in the Hive metadata schema will be added to the harmonic schema as nullable fields

Metadata Refreshing

Spark SQL caches Parquet metadata for better performance. When the Hive MetabStore Parquet table transformation is enabled, the metadata of these transformed tables is also cached. If these tables are updated by Hive or other external tools, they need to be manually refreshed to ensure consistency of metadata.

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
Copy the code

Configuration

Parquet can be configured using the setConf method on SparkSession or using SQL to run the SET key = value command.

Property Name Default Meaning Since Version
spark.sql.parquet.binaryAsString false Other systems that produce Parquet, notably Impala, Hive, and older versions of Spark SQL, write Parquet schemas without distinguishing between binary data and strings. This flag tells Spark SQL to interpret binary data as strings to provide compatibility with these systems 1.1.1
spark.sql.parquet.int96AsTimestamp true Some Parquet-Producing systems, notably Impala and Hive, store timestamps in INT96. This flag tells Spark SQL to interpret INT96 data as timestamps to provide compatibility with these systems 1.3.0
spark.sql.parquet.compression.codec snappy Sets the compression codec to use when writing Parquet files. If compression or parquet.compression is specified in the table specific options/attributes, the priority will be compression, parquet.compression, Spark.sql.parquet.com pression. Codec. The acceptable values are None, uncompressed, snappy, gzip, lzo, brotli, lz4, and ZSTD. Note that ZSTD requires ZStandardCodec installed before Hadoop 2.9.0, while Brotli requires BrotliCodec installed. 1.1.1
spark.sql.parquet.filterPushdown true When set to true, pushdown filter optimization is enabled 1.2.0
spark.sql.hive.convertMetastoreParquet true When set to false, Spark SQL will use Hive SerDe as a solid wood laminate floor table instead of built-in support 1.1.1
spark.sql.parquet.mergeSchema false If true, the Parquet data source merges the schema collected from all data files, otherwise, if no summary file is available, the schema is selected from either the summary file or the random data file. 1.5.0
spark.sql.parquet.writeLegacyFormat false If true, data will be written using Spark 1.4 and earlier. For example, decimal values will be written in Apache Parquet’s fixed-length byte array format, which is used by other systems such as Apache Hive and Apache Impala. If not, the updated format in the parquet will be used. For example, decimals will be written in int-based format. Set to true if Parquet output is used for systems that do not support this new format 1.6.0

ORC file

Starting with Spark 2.3, Spark supports vectorized ORC readers whose ORC files are in the new ORC file format. To this end, the following configuration has been added. When the spark. SQL. Orc. Impl Settings for native and spark. SQL. Orc. EnableVectorizedReader is set to true, the reader to the machine table orc vector quantization (for example, USING the clause USING orc create table). The Hive ORC Serde table (for example, USING the clause USING Hive OPTIONS (fileFormat ‘ORC) create table), when the spark. SQL. Hive. ConvertMetastoreOrc is set to true, Vectorized readers will be used.

Property Name Default Meaning Since Version
spark.sql.orc.impl native Name of the ORC implementation. It can be nativeandHive. Native one of these. Native stands for native ORC support. Hive is the HIVE ORC library. 2.3.0
spark.sql.orc.enableVectorizedReader true Enable vectorization ORC decoding in the native implementation. If false, the new unvectorized ORC reader is used in the native implementation. For Hive implementations, this is ignored. 2.3.0

JSON file

Spark SQL can automatically infer the SCHEMA of a JSON Dataset and load it as Dataset [Row]. This conversion can be done using the Dataset [String] or sparksession.read.json () on the JSON file.

Note that the file provided as a JSON file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. For more information, see JSON row text format, also known as newline-delimited JSON.

For regular multi-line JSON files, set the multiLine option to true.

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""": :Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Copy the code

Hive table data

Spark SQL also supports reading and writing data stored in Apache Hive. However, because Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies are found on the classpath, Spark will automatically load them. Note that these Hive dependencies must also be present on all working nodes because they need access to the Hive serialization and deserialization library (SerDes) in order to access data stored in Hive.

Hive configuration is done by putting your hive-site. XML, core-site. XML (for security configuration), and hdFS-site. XML (for HDFS configuration) files in conf/.

To use Hive, SparkSession must be instantiated using Hive support, including persistent Hive metadata storage connection, Hive serdes support, and Hive user-defined functions. Users who do not have existing Hive deployments can still enable Hive support. When not configured by hive-site. XML, the context automatically creates metastore_ db in the working directory file and creates a directory configured by spark.sql.warehouse. By default, this directory is spark-warehouse in the working directory file where the Spark application is started. Please note that hive – site. XML in the hive. Metastore. Warehouse. The dir attribute from the Spark is deprecated since 2.0.0. Instead, use spark.sql.warehouse. Dir to specify the default location of the database in warehouse. You may need to grant write permission to the user who started the Spark application.

import java.io.File

import org.apache.spark.sql.{Row.SaveMode.SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
/ / | 500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) = >s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
/ / | | 0
/ / | | 1
/ / | | 2
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition"."true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode"."nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()
Copy the code

Specifying storage format for Hive tables

When creating a Hive table, you need to define how the table reads and writes data from the file system. I.e. the “input format” and “Output format”. You also need to define how the table deserializes data to Rows, or rows to data, “serde.” The following options can be used to specify the storage format (” serde “, “Input Format”, “Output format”), For example CREATE TABLE SRC (id int) USING hive OPTIONS (fileFormat ‘parquet’). By default, we read the table file as plain text. Note that Hive storage handlers are not yet supported when creating tables. You can create tables using storage handlers on the Hive side and read them using Spark SQL.

Property Name Meaning
fileFormat FileFormat is a package that stores format specifications including “serde”, “input Format”, and “Output Format”. Currently we support six fileFormats: ‘Sequencefile’, ‘RCFile’, ‘ORc’, ‘Parquet’, ‘TextFile’ and ‘Avro’.
inputFormat, outputFormat These two options will be corresponding InputFormat and OutputFormat class name is specified as a string literal, such as org.. Apache hadoop. Hive. Ql. IO. Orc. OrcInputFormat. These two options must come in pairs and cannot be specified if you have already specified the fileFormat option.
serde This option specifies the name of the Serde class. When the fileFormat option is specified, do not specify this option if the given fileFormat already contains serde information. Currently “Sequencefile”, “TextFile” and “rcFile” do not include serde information, you can use this option with these 3 file formats.
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim These options can only be used with “TextFile” fileFormat. They define how delimited files are read into lines.

JDBC Connecting to the database

Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should take precedence over using JdbcRDD. This is because the results are returned as DataFrame and can be easily processed in Spark SQL or joined with other data sources. JDBC data sources are also easier to use from Java or Python because it does not require the user to provide ClassTag. (Note that this is different from the Spark SQL JDBC server, which allows other applications to run queries using Spark SQL.)

To get started, you need to include the JDBC driver for the specific database in the Spark classpath. For example, to connect to Postgres from Spark Shell, you need to run the following command:

/bin/spark-shell --driver-class-path postgresqL-9.4.1207.jar --jars postgresqL-9.4.1207.jarCopy the code

You can use the Data Sources API to load tables in a remote database as DataFrame or Spark SQL temporary views. The user can specify JDBC connection properties in the data source option. User and password are typically provided as connection properties for logging in to the data source. In addition to connection properties, Spark also supports the following case-insensitive options:

Property Name Meaning
url The JDBC URL to connect to. Source-specific connection properties can be specified in the URL. For example,jdbc:postgresql://localhost/test? user=fred&password=secret
dbtable The JDBC table from which you should read or write. Note that when you use it in the read path, you can use anything that is valid in the FROM clause of the SQL query. For example, in addition to the full table, you can use subqueries in parentheses. You cannot specify dbTABLE and query options at the same time.
query Queries used to read data into Spark. The specified query is enclosed in parentheses and used as a subquery in the FROM clause. Spark also assigns an alias to the subquery clause. For example, Spark issues the following form of query to a JDBC Source.

SELECT FROM (<user_specified_query>) spark_gen_alias

Here are some limitations when using this option.

1. Do not specify dbTABLE and query at the same time

2. Do not specify both query and partitionColumn options. When you need to specify the partitionColumn option, you can use the DBTABLE option to specify subqueries, and you can qualify partitioned columns using the subquery alias provided as part of the DBTABLE.

spark.read.format(“jdbc”)

.option(“url”, jdbcUrl)

.option(“query”, “select c1, c2 from t1”)

.load()
driver The class name of the JDBC driver used to connect to this URL.
partitionColumn, lowerBound, upperBound If one of these options is specified, all of them must be specified. In addition, numPartitions must be specified. They describe how to partition a table when reading in parallel from multiple workers. The partitionColumn must be a number, date, or timestamp column in the related table. Note that lowerBound and upperBound are only used to determine partition steps, not to filter rows in a table. Therefore, all rows in the table are partitioned and returned. This option is only available for reading.
numPartitions Maximum number of partitions that can be used for table read and write parallelism. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we reduce it to this limit by calling coalesce(numPartitions) before writing.
queryTimeout The number of seconds the driver waits for the Statement object to execute to a given number of seconds. 0 means there is no limit. In the write path, this option depends on how the JDBC driver implements the API setQueryTimeout, for example, the H2 JDBC driver checks the timeout for each query, rather than the entire JDBC batch. The default value is 0.
fetchsize JDBC gets the size, which determines how many rows are fetched per round trip. This helps JDBC driver performance default to low read sizes (such as 10 rows for Oracle). This option is only available for reading.
batchsize The JDBC batch size, which determines how many rows are inserted per round trip. This helps improve the performance of JDBC drivers. This option applies only to writes. The default value is 1000.
isolationLevel The transaction isolation level applied to the current connection. It can be one of NONE, READ COMMITTED, READ UNCOMMITTED, REPEATABLE READ, or SERIALIZABLE, corresponding to the standard transaction isolation level defined by JDBC’s Connection object, The default value is READ UNCOMMITTED. This option applies only to writes. Refer to the documentation in java.sql.Connection.
sessionInitStatement After each database session is opened to the remote database and before it starts reading data, its option executes a custom SQL statement (or a PL/SQL block) that implements session initialization.option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;" "")
truncate This is a JDBC writer-related option. When the SaveMode. If overwriting is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This is more efficient and prevents the deletion of table metadata (for example, indexes). However, in some cases, such as when the new data has a different schema, it will not work. The default value is false. This option applies only to writes.
cascadeTruncate This is a JDBC writer-related option. If this option is enabled and supported by JDBC databases (currently PostgreSQL and Oracle), execute TRUNCATE TABLE T CASCADE(in PostgreSQL case, Execute TRUNCATE TABLE ONLY T CASCADE to prevent accidental truncation. This will affect other tables and should be used with care. This option applies only to writes. This defaults to the truncate behavior of a JDBC database and is specified in isCascadeTruncate in each JDBCDialect.
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url"."jdbc:postgresql:dbserver")
  .option("dbtable"."schema.tablename")
  .option("user"."username")
  .option("password"."password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user"."username")
connectionProperties.put("password"."password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver"."schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema"."id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver"."schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url"."jdbc:postgresql:dbserver")
  .option("dbtable"."schema.tablename")
  .option("user"."username")
  .option("password"."password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver"."schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes"."name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver"."schema.tablename", connectionProperties)
Copy the code

Apache Avro Data Source Guide

Since the release of Spark 2.4, Spark SQL provides built-in support for reading and writing Apache Avro data.

Deploying

The spark-avro module is external and does not include spark-submitorspark-shell by default

As with any Spark application, spark-Submit is used to start the application. Spark-avro _ 2.12 and its dependencies can be added directly to spark-submit using –packages, such as,

The. / bin/spark - submit - packages org. Apache. The spark: spark - avro_2. 12:3. 1.1...Copy the code

For experiments on spark-shell, you can also add org.apache.spark: spark-avro_ 2.12 and its dependencies directly using — Packages.

The. / bin/spark - the shell - packages org. Apache. The spark: spark - avro_2. 12:3. 1.1...Copy the code

For more information about submitting applications with external dependencies, see the Application Submission Guide.

Load and Save Functions

Because the Spark-Avro module is external, there is no. Avro API in the DataFrameReader or DataFrameWriter.

To load/save data in Avro format, you need to format data source option is specified for Avro (or org. Apache. Spark. SQL. Avro).

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Copy the code

to_avro() and from_avro()

The Avro package provides to_AVro functions to encode binary columns in Avro format, and from_Avro () functions to decode Avro binary data into a column, both of which convert one column to another, and the input/output SQL data types can be complex types or primitive types.

Using Avro records as columns is useful when reading or writing streams from sources such as Kafka. Each Kafka key-value adds metadata such as ingestion time stamps in Kafka, offsets in Kafka, and so on.

If the “value” field containing the data is in Avro, use from_avro() to extract your data, enrich it, clean it up, and then sink it into Kafka or write out a file

To_avro () can be used to turn a structure into an Avro record. This approach is particularly useful when you want to recode multiple columns into a single column, because it allows data to be written out to Kafka

import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers"."host1:port1,host2:port2")
  .option("subscribe"."topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers"."host1:port1,host2:port2")
  .option("topic"."topic2")
  .start()
Copy the code

Data Source Option

Avro’s data source option is available through

DataFrameReader or DataFrameWriter. Option method.

The options argument in function