Spark SQL Method for reading MySQL

Spark SQL also includes a data source that can read data from other databases using JDBC. This feature should be used in preference to JdbcRDD. This is because the results are returned as dataframes, which can be easily processed in Spark SQL or connected to other data sources. JDBC data sources are also easier to use in Java or Python because it does not require the user to provide ClassTag.

You can use the Data Sources API to load tables in a remote database as DataFrame or Spark SQL temporary views. The user can specify JDBC connection properties in the data source option. User and password are commonly used as connection properties to log in to the data source. In addition to connection properties, Spark also supports the following case-insensitive options:

The attribute name explain
url The JDBC URL to connect to
dbtable A JDBC table to read or write to
query Specifying a query statement
driver The name of the JDBC driver class used to connect to this URL
partitionColumn, lowerBound, upperBound If these options are specified, they must all be specified. In addition,numPartitionsYou must specify
numPartitions The maximum number of partitions in table reads and writes that can be used for parallel processing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we can passcoalesce(numPartitions)Calling before writing reduces it to this limit
queryTimeout The default is0To query the timeout period
fetchsize The fetch size of JDBC, which determines how many rows to fetch at a time. This can help improve the performance of JDBC drivers
batchsize The default is 1000, the JDBC batch size, which can help improve the performance of JDBC drivers.
isolationLevel Transaction isolation level for the current connection. It could be aNONE.READ_COMMITTED.READ_UNCOMMITTED.REPEATABLE_READOr,SERIALIZABLECorresponding to the connection object defined by JDBC, the default value is the standard transaction isolation levelREAD_UNCOMMITTED. This option is only available for writing.
sessionInitStatement After each database session is opened to the remote database and before data is read, this option executes a custom SQL statement that is used to implement the session initialization code.
truncate This is the JDBC Writer-related option. whenSaveMode.OverwriteWhen enabled, it empties the contents of the target table rather than deleting and rebuilding its existing tables. The default isfalse
pushDownPredicate Options for enabling or disabling predicates to push down to JDBC data sources. The default value is true, in which case Spark pushes the filter down to the JDBC data source as far as possible.

The source code

  • SparkSession
/** * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a * `DataFrame`. * {{{ * sparkSession.read.parquet("/path/to/file.parquet") * sparkSession.read.schema(schema).json("/path/to/file.json") * }}} * * @ since 2.0.0 * /
  def read: DataFrameReader = new DataFrameReader(self)
Copy the code
  • DataFrameReader
  / /... Omit code...
  / * * * all the data by the RDD a partition processing, if the table is very big, you are likely to appear OOM * can use DataFrameDF RDD. Partitions. See * / size method
  def jdbc(url: String, table: String, properties: Properties) :DataFrame = {
    assertNoSpecifiedSchema("jdbc")
    this.extraOptions ++= properties.asScala
    this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
    format("jdbc").load()
  }
/** * @param URL Database URL * @param TABLE table name * @param columnName Partition field name * @param lowerBound 'columnName' minimum value, used for partition step * @param UpperBound 'columnName' maximum value for the partition step * @param numnumber of partitions * @param connectionProperties Other parameters * @since 1.4.0 */
  def jdbc(
      url: String,
      table: String,
      columnName: String,
      lowerBound: Long,
      upperBound: Long,
      numPartitions: Int,
      connectionProperties: Properties) :DataFrame = {
    this.extraOptions ++= Map(
      JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
      JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
      JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
      JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
    jdbc(url, table, connectionProperties)
  }

  /** * @param predicates where conditions * such as "id <= 1000", "score > 1000 and score <= 2000" will be split into two partitions * @since 1.4.0 */
  def jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties) :DataFrame = {
    assertNoSpecifiedSchema("jdbc")
    val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
    val options = new JDBCOptions(url, table, params)
    val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
      JDBCPartition(part, i) : Partition
    }
    val relation = JDBCRelation(parts, options)(sparkSession)
    sparkSession.baseRelationToDataFrame(relation)
  }

Copy the code

The sample

	private def runJdbcDatasetExample(spark: SparkSession) :Unit = {
    
    // Load data from JDBC source
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url"."JDBC: mysql: / / 127.0.0.1:3306 / test")
      .option("dbtable"."mytable")
      .option("user"."root")
      .option("password"."root")
      .load()

    val connectionProperties = new Properties()
    connectionProperties.put("user"."root")
    connectionProperties.put("password"."root")
    val jdbcDF2 = spark.read
      .jdbc("JDBC: mysql: / / 127.0.0.1:3306 / test"."mytable", connectionProperties)
    // Specify the data type to read from the schema
    connectionProperties.put("customSchema"."id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("JDBC: mysql: / / 127.0.0.1:3306 / test"."mytable", connectionProperties)

  }
Copy the code

Note that if no partition is specified in the preceding method, Spark uses one partition to read data by default. In this case, OOM will appear when the data volume is very large. After read the data, call DataFrameDF. RDD. Partitions. The size method can view the partition number.

Spark SQL writes data to MySQL in batches

A code example is as follows:

object BatchInsertMySQL {
  case class Person(name: String, age: Int)
  def main(args: Array[String) :Unit = {

    // Create a sparkSession object
    val conf = new SparkConf()
      .setAppName("BatchInsertMySQL")
    val spark: SparkSession =  SparkSession.builder()
      .config(conf)
      .getOrCreate()
    import spark.implicits._
    // MySQL connection parameters
    val url = JDBCUtils.url
    val user = JDBCUtils.user
    val pwd = JDBCUtils.password

    // Create the Properties object and set the user name and password for connecting to mysql
    val properties: Properties = new Properties()

    properties.setProperty("user", user) / / user name
    properties.setProperty("password", pwd) / / password
    properties.setProperty("driver"."com.mysql.jdbc.Driver")
    properties.setProperty("numPartitions"."10")

    // Read the table data in mysql
    val testDF: DataFrame = spark.read.jdbc(url, "test", properties)
     println("TestDF partition number:" + testDF.rdd.partitions.size)
   testDF.createOrReplaceTempView("test")
   testDF.persist(StorageLevel.MEMORY_AND_DISK)
   testDF.printSchema()

    val result =
      s"""-- SQL code""".stripMargin

    val resultBatch = spark.sql(result).as[Person]
    println("ResultBatch Specifies the number of partitions:" + resultBatch.rdd.partitions.size)

    // Write to MySQL in batches
    // It is best to repartition the result of the processing here
    // There is a large amount of data in each partition
    resultBatch.repartition(500).foreachPartition(record => {

      val list = new ListBuffer[Person]
      record.foreach(person => {
        val name = Person.name
        val age = Person.age
        list.append(Person(name,age))
      })
      upsertDateMatch(list) // Perform batch data insertion
    })
    // Insert MySQL in batches
    def upsertPerson(list: ListBuffer[Person) :Unit = {

      var connect: Connection = null
      var pstmt: PreparedStatement = null

      try {
        connect = JDBCUtils.getConnection()
        // Disable automatic submission
        connect.setAutoCommit(false)

        val sql = "REPLACE INTO `person`(name, age)" +
          " VALUES(? ,?) "

        pstmt = connect.prepareStatement(sql)

        var batchIndex = 0
        for (person <- list) {
          pstmt.setString(1, person.name)
          pstmt.setString(2, person.age)
          // Add batch
          pstmt.addBatch()
          batchIndex +=1
          // Control the number of submissions,
          // Limit the amount of data submitted in batches, otherwise MySQL will be written down!!
          if(batchIndex % 1000= =0&& batchIndex ! =0){
            pstmt.executeBatch()
            pstmt.clearBatch()
          }

        }
        // Submit the batch
        pstmt.executeBatch()
        connect.commit()
      } catch {
        case e: Exception =>
          e.printStackTrace()
      } finally {
        JDBCUtils.closeConnection(connect, pstmt)
      }
    }

    spark.close()
  }
}

Copy the code

JDBC connection utility class:

object JDBCUtils {
  val user = "root"
  val password = "root"
  val url = "jdbc:mysql://localhost:3306/mydb"
  Class.forName("com.mysql.jdbc.Driver")
  // Get the connection
  def getConnection() = {
    DriverManager.getConnection(url,user,password)
  }
// Release the connection
  def closeConnection(connection: Connection, pstmt: PreparedStatement) :Unit = {
    try {
      if(pstmt ! =null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if(connection ! =null) {
        connection.close()
      }
    }
  }
}

Copy the code

conclusion

When Spark writes a large amount of data to MySQL, repartition the WRITTEN DF to avoid excessive data in the partition. When writing, use foreachPartition to obtain a connection foreachPartition and set the batch commit within the partition. The batch commit cannot be too large to avoid database write hang.

The official number “Big data Technology and data warehouse”, reply “data” to get the big data data package