RDBMS options in Apache Spark SQL

Versions: Apache Spark 2.3.1

Some recent posts covered important Spark SQL options for RDBMS: partitioning and write modes. However they're not the only ones available for this data storage.

This post explains another configuration options that can be used for data source or data sink. Each of sections describes one option and among them we'll retrieve: isolation level, options for tables creation, schema and session customization. Each part, after a short definition, has test case illustrating the option.

But before talking about the options, below you can find the script used to setup the execution environment:

  1. Start MySQL server with:
        docker run -p 3306:3306 --name spark_jdbc_options_test -e MYSQL_ROOT_PASSWORD=root -d mysql:5.7
        --mount type=bind, src=./init.sql, dst=/docker-entrypoint-initdb.d/
        
  2. Create database schema used during tests file with the following content:
        CREATE DATABASE spark_jdbc_options_test; 
        

Please note that other options are described in separated posts: Partitioning RDBMS data in Spark SQL, Fetchsize in Spark SQL and SaveMode.Overwrite trap with RDBMS in Apache Spark SQL.

Isolation level

The property called isolationLevel defines the transaction isolation level for writing part. It accepts one of classical SQL levels: NONE (write is not transactional), READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE. Default value is READ_UNCOMMITTED. Details for these properties were covered in the post about Transaction isolation in Hibernate.

Since it's difficult to show all of these levels , let's illustrate what happens with NONE and REPEATABLE_READ where a duplicated row is inserted with SaveMode not overwriting already existent table:

private val TransactionIsolationNoneTable = "transaction_isolation_test"
private val TransactionIsolationReadCommittedTable = "transaction_isolation_read_committed_test"

"None transaction level" should "insert rows without the transaction" in {
  MysqlConnector.executedSideEffectQuery(s"CREATE TABLE IF NOT EXISTS ${TransactionIsolationNoneTable} " +
    s"(id INT(11) NOT NULL PRIMARY KEY, login TEXT)")
  val usersToInsert = Seq(User(4, s"User#4"))
  MysqlConnector.populateTable(s"INSERT INTO ${TransactionIsolationNoneTable} (id, login) VALUES (?, ?)", usersToInsert)
  val users = (1 to 10).map(nr => User(nr, s"User#${nr}")).toDF("id", "login")
    .repartition(1)
    .orderBy($"id".asc)

  val insertException = intercept[SparkException] {
    users.write
      .mode(SaveMode.Append)
      .option("isolationLevel", "NONE")
      .option("batchsize", 1)
      .jdbc(Connection, TransactionIsolationNoneTable, ConnectionProperties)
  }

  insertException.getMessage should include("Duplicate entry '4' for key 'PRIMARY'")
  val MysqlConnector2 = new MysqlConnector(Connection, DbUser, DbPassword)
  val rows = MysqlConnector2.getRows(s"SELECT * FROM ${TransactionIsolationNoneTable}", (resultSet) => {
    (resultSet.getInt("id"), resultSet.getString("login"))
  })
  rows should have size 4
  rows should contain allOf((1, "User#1"), (2, "User#2"), (3, "User#3"), (4, "User#4"))
}

"READ_COMMITTED isolation level" should "insert all or none rows within a transaction" in {
  // Here we test exactly the same code as above but with different transaction level
  MysqlConnector.executedSideEffectQuery(s"CREATE TABLE IF NOT EXISTS ${TransactionIsolationReadCommittedTable} " +
    s"(id INT(11) NOT NULL PRIMARY KEY, login TEXT)")
  val usersToInsert = Seq(User(4, s"User#4"))
  MysqlConnector.populateTable(s"INSERT INTO ${TransactionIsolationReadCommittedTable} (id, login) VALUES (?, ?)", usersToInsert)

  val users = (1 to 10).map(nr => User(nr, s"User#${nr}")).toDF("id", "login")
    .orderBy($"id".asc)
    .repartition(1)

  val insertException = intercept[SparkException] {
    users.write
      .mode(SaveMode.Append)
      .option("isolationLevel", "READ_COMMITTED")
      .option("batchsize", 1)
      .jdbc(Connection, TransactionIsolationReadCommittedTable, ConnectionProperties)
  }

  insertException.getMessage should include("Duplicate entry '4' for key 'PRIMARY'")
  val rows = MysqlConnector.getRows(s"SELECT * FROM ${TransactionIsolationReadCommittedTable}", (resultSet) => {
    (resultSet.getInt("id"), resultSet.getString("login"))
  })
  rows should have size 1
  rows(0) shouldEqual (4, "User#4")
}

Creation options

This first group of options is related to the tables creation. It contains 2 configurations. The first one is createTableOptions and it lets us to append additional options during table creation step. For instance in the following test we overwrite already created table as the one of InnoDB type, encoded with UTF-8:

private val CreateTableOptionsTable = "create_table_options_test"

"createTableOptions " should "create table with custom ENGINE and CHARSET options" in {
  val users = (1 to 10).map(nr => User(nr, s"User#${nr}")).toDF("id", "login")

  // createTableOptions is appended to the end of the creation query:
  //val sql = s"CREATE TABLE $table ($strSchema) $createTableOptions"
  users.write
    .mode(SaveMode.Overwrite)
    .option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Spark SQL test table'")
    .jdbc(Connection, CreateTableOptionsTable, ConnectionProperties)

  val tableProps = MysqlConnector.getRows(s"SHOW TABLE STATUS WHERE Name = '${CreateTableOptionsTable}'", (resultSet) => {
    (resultSet.getString("Engine"), resultSet.getString("Collation"), resultSet.getString("Comment"))
  })
  tableProps should have size 1
  val (engine, collation, comment) = tableProps(0)
  engine shouldEqual "InnoDB"
  collation shouldEqual "utf8_general_ci"
  comment shouldEqual "Spark SQL test table"
}

Another options is createTableColumnTypes and it allows to define custom column types during the table creation step. For instance, we can transform a string-like column to a VARCHAR(...) type or reduce the size of numeric integer column to SMALLINT. However not all transformations are possible. In MySQL we can create an integer type and specify how many characters will be displayed to a client (INT(1), INT(2), INT(3), ....). This is not brought with Spark that treats INT as unvariable field. More details about variability can be found in org.apache.spark.sql.catalyst.parser.AstBuilder#visitPrimitiveDataType(ctx: PrimitiveDataTypeContext):

  (dataType, ctx.INTEGER_VALUE().asScala.toList) match {
    case ("boolean", Nil) => BooleanType
    case ("tinyint" | "byte", Nil) => ByteType
    case ("smallint" | "short", Nil) => ShortType
    case ("int" | "integer", Nil) => IntegerType
    case ("bigint" | "long", Nil) => LongType
    case ("float", Nil) => FloatType
    case ("double", Nil) => DoubleType
    case ("date", Nil) => DateType
    case ("timestamp", Nil) => TimestampType
    case ("string", Nil) => StringType
    case ("char", length :: Nil) => CharType(length.getText.toInt)
    case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
    case ("binary", Nil) => BinaryType
    case ("decimal", Nil) => DecimalType.USER_DEFAULT
    case ("decimal", precision :: Nil) => DecimalType(precision.getText.toInt, 0)
    case ("decimal", precision :: scale :: Nil) =>
      DecimalType(precision.getText.toInt, scale.getText.toInt)
    case (dt, params) =>
      val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
      throw new ParseException(s"DataType $dtStr is not supported.", ctx)
  }

An example of createTableColumnTypes option use is provided in the test below:

private val CreateTableColumnTypesTable = "create_table_column_types_test"

"createTableColumnTypes" should "add custom VARCHAR length to created field" in {
  val users = (1 to 10).map(nr => User(nr, s"User#${nr}")).toDF("id", "login")

  users.write
    // We can't do more, e.g. specify `id INT(11) NOT NULL PRIMARY KEY`
    .option("createTableColumnTypes", "id SMALLINT, login VARCHAR(7)")
    .mode(SaveMode.Overwrite)
    .jdbc(Connection, CreateTableColumnTypesTable, ConnectionProperties)

  val tableProps = MysqlConnector.getRows(s"DESC ${CreateTableColumnTypesTable}", (resultSet) => {
    (resultSet.getString("Field"), resultSet.getString("Type"))
  })
  tableProps should have size 2
  tableProps should contain allOf(("id", "smallint(6)"), ("login", "varchar(7)"))
}

Schema customization

In above section we've seen the possibility to customize the schema for writing. However we can also define our own schema for reading with customSchema option. It's a good place to use vendor-specific SQL functions that are not available in Spark SQL. Such specific functions can be used to transform given field to the type known by Apache Spark for further processing. An example of such use is defined below:

private val CustomSchemaTable = "custom_schema_test"

"customSchema" should "convert number type to string at reading" in {
  MysqlConnector.executedSideEffectQuery(s"CREATE TABLE ${CustomSchemaTable} (id INT(11) NOT NULL, login TEXT)")
  val usersToInsert = (1 to 3).map(nr => User(nr, s"User#${nr}"))
  MysqlConnector.populateTable(s"INSERT INTO ${CustomSchemaTable} (id, login) VALUES (?, ?)", usersToInsert)
  val users = sparkSession.read.format("jdbc").option("url", Connection)
    .option("dbtable", CustomSchemaTable)
    .option("user", DbUser)
    .option("password", DbPassword)
    .option("customSchema", "id STRING, name STRING")
    .load()

  users.schema.fields(0).dataType shouldBe a [StringType]
  users.schema.fields(1).dataType shouldBe a [StringType]
  val userIds = users.map(row => row.getAs[String]("id")).collectAsList()
  userIds should contain allOf("1", "2", "3")
}

Batch size

Another JDBC option is related to the writing part and defines the number of batches executed for insert operation. This number is specified with batchsize option. Spark will group the inserted rows into the batches, each with the number of elements defined by the option. We can see this use in org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils#org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils#savePartition(getConnection: () => Connection, table: String, iterator: Iterator[Row], rddSchema: StructType, insertStmt: String, batchSize: Int, dialect: JdbcDialect, isolationLevel: Int) method:

var rowCount = 0
while (iterator.hasNext) {
  val row = iterator.next()
  var i = 0
  while (i < numFields) {
    if (row.isNullAt(i)) {
      stmt.setNull(i + 1, nullTypes(i))
    } else {
      setters(i).apply(stmt, row, i)
    }
    i = i + 1
  }
  stmt.addBatch()
  rowCount += 1
  if (rowCount % batchSize == 0) {
    stmt.executeBatch()
    rowCount = 0
  }
}
if (rowCount > 0) {
  stmt.executeBatch()
}

The following test case asserts above code correctness:

private val ExecuteBatchMethodName = "executeBatch"
before {
  MethodInvocationCounter.methodInvocations.remove(ExecuteBatchMethodName)
}

"batchsize of 3" should "insert 10 rows in 4 batches" in {
  val users = (1 to 10).map(nr => User(nr, s"User#${nr}")).toDF("id", "login")
    // We test batchsize so having 1 partition is more than useful
    .repartition(1)

  MethodInvocationDecorator.decorateClass("com.mysql.cj.jdbc.StatementImpl", "executeBatch").toClass
  users.write
    .option("batchsize", 3)
    .mode(SaveMode.Overwrite)
    .jdbc(Connection, BatchSizeTable, ConnectionProperties)

  MethodInvocationCounter.methodInvocations(ExecuteBatchMethodName) shouldEqual 4
  val rows = MysqlConnector.getRows(s"SELECT * FROM ${BatchSizeTable}", (resultSet) => {
    (resultSet.getInt("id"), resultSet.getString("login"))
  })
  rows should have size 10
  rows should contain allOf((1, "User#1"), (2, "User#2"), (3, "User#3"), (4, "User#4"),
    (5, "User#5"), (6, "User#6"), (7, "User#7"), (8, "User#8"), (9, "User#9"), (10, "User#10"))
}

Session customization

The last described option comes from reading-related group. It's called sessionInitStatement and allows to define a custom SQL code that will be executed after opening the session to remote database and just before starting to read the data from it. The following test shows how it can be use to get rid of invalid rows but it can also be used for other purposes (changing read table property, timezone and so on):

private val SessionInitTable = "session_init_test"

"sessionInitStatement" should "remove one row before adding new rows from DataFrame" in {
  MysqlConnector.executedSideEffectQuery(s"CREATE TABLE IF NOT EXISTS ${SessionInitTable} " +
    s"(id INT(11) NOT NULL, login TEXT)")
  val usersToInsert = (1 to 2).map(nr => User(nr, s"User#${nr}"))
  MysqlConnector.populateTable(s"INSERT INTO ${SessionInitTable} (id, login) VALUES (?, ?)", usersToInsert)

  val users = sparkSession.read.format("jdbc").option("url", Connection)
    .option("dbtable", SessionInitTable)
    .option("user", DbUser)
    .option("password", DbPassword)
    .option("sessionInitStatement", s"DELETE FROM ${SessionInitTable} WHERE id < 2")
    .load()
    .select("id")

  val usersIds = users.collect().map(row => row.getAs[Int]("id"))
  usersIds should have size 1
  usersIds(0) shouldEqual 2
  val rows = MysqlConnector.getRows(s"SELECT * FROM ${SessionInitTable}", (resultSet) => {
    (resultSet.getInt("id"), resultSet.getString("login"))
  })
  rows should have size 1
  rows(0) shouldEqual (2, "User#2")
}

Spark SQL has a wide range of options that can be used for JDBC sources and sinks. Since the framework can sometimes create output tables automatically, it's useful to use the options related to tables creation: createTableOptions and createTableColumnTypes presented in the first section. As shown, both can be used either to customize table or fields. Other writing options are about the size of items inserted in one query (batch size) and transaction isolation. The other category of options concerns reading part and they can be used to slightly change input schema or call custom SQL code before start reading the data.