Loading data from RDBMS

Versions: 2.1.0

Structured data processing takes more and more place in Apache Spark project. Structured streaming is one of the proofs. But how does Spark SQL work - and particularly, how does it load data from sources of structured data as RDMBS ?

This post tries to answer to the last question about data loading from RDBMS. Its first section shows what steps are achieved by Spark to do that. The second part focuses on parameters used to configure the size of fetched data or the processing parallelism. The last section illustrates the elements described in previous sections through learning tests.

Data preparation

Loading data from relational databases with Spark SQL consists on creating the instance of org.apache.spark.sql.DataFrameReader. This object is able to read data from a lot of different external sources: RDBMS, file systems, NoSQL databases. For the case described in this post (RDBMS), the instance of DataFrameReader is created through the call to format(source: String) method with jdbc paramater.

After defining the format of data source, it's time to specify the options. In the case of RDBMS, we need to define: the connection url (url), the driver (driver), the database user (user) with password (password) and, obviously, the read table (dbtable). The last entry is very important since it determines the final schema of DataFrame. There are also other options, as the ones related to partitioning or to the size of JDBC fetch, but they'll be detailed in next posts.

The call of load() method starts the creation of DataFrameReader. The first step is the BaseRelation resolution, and in the case of RDBMS, more specifically the instance of JDBCRelation class. This instance contains all information required to execute defined queries efficiently: schema and partition level.

At this moment, no data is moved from the database to the Spark SQL application. The load() call creates the instance of Dataset lazily. The physical creation (i.e. data reading) is done once an action on given Dataset is called (e.g. foreach).

Query execution

Once lazy instance of Dataset created, some queries can be executed against it. When it occurs, Spark SQL creates the corresponding RDD called JDBCRDD. Under-the-hood this specific kind of RDD is generated with the classical approach of calling compute() method.

The query is executed with the help of JDBC connector that generates prepared statement. After the execution, the query returns the results as instance of ResultSet class. They're later wrapped around an iterator (NextIterator) used to build the finally read object, RDD[InternalRow] instance (= JDBCRDD because it extends the RDD[InternalRow] class). The following snippet coming from JDBCRDD shows the query execution:

val myWhereClause = getWhereClause(part)

val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
    ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(options.fetchSize)
rs = stmt.executeQuery()
val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)

Both driver and executor don't read the same things. The driver needs to know the schema, so it executes only a dummy query to get the columns of the table (check the post about Schema projection to get more details). In the other side, data is fetched by the executors, and eventually brought back to the driver if one of toLocalIterator or collect method is called. It's important to note that, if the partitioning is missing or incorrectly configured, the whole data can be read by a single executor (= as it would be read only by the driver). But the partitioning in Spark SQL will be explained in other post.

Loading data example

The following test case gives an idea on how data is read from RDBMS with appropriated DataFrameReader:

val sparkSession: SparkSession = SparkSession.builder()
    .appName("Spark SQL automatic schema resolution test").master("local[*]").getOrCreate()

before {
  case class OrderDataOperation(id: Int, amount: Double) extends DataOperation {
    override def populatePreparedStatement(preparedStatement: PreparedStatement): Unit = {
      preparedStatement.setInt(1, id)
      preparedStatement.setDouble(2, amount)
    }
  }
  val ordersToInsert = mutable.ListBuffer[OrderDataOperation]()
  for (id <- 1 to 1000) {
    ordersToInsert.append(OrderDataOperation(id, ThreadLocalRandom.current().nextDouble(2000d)))
  }

  InMemoryDatabase.createTable("CREATE TABLE orders(id integer primary key, amount decimal(6,2))")
  InMemoryDatabase.populateTable("INSERT INTO orders (id, amount) VALUES (?, ?)", ordersToInsert)
}

after {
  sparkSession.stop()
  InMemoryDatabase.cleanDatabase()
}

"all orders" should "brought to the driver" in {
  val ordersReader = sparkSession.read.format("jdbc")
    .option("url", InMemoryDatabase.DbConnection)
    .option("driver", InMemoryDatabase.DbDriver)
    .option("dbtable", "orders")
    .option("user", InMemoryDatabase.DbUser)
    .option("password", InMemoryDatabase.DbPassword)
    .load()

  import sparkSession.implicits._
  val ordersIds: Array[Int] = ordersReader.select("id")
    .map(row => row.getInt(0))
    .collect()

  ordersIds.size shouldEqual(1000)
  for (id <- 1 to 1000) {
    ordersIds.contains(id) shouldBe true
  }
}

This post shows some main facts about working with RDBMS through Spark SQL. The first part explained DataFrameReader used to build a lazy loaded DataFrame. The second part shown the technology employed to execute SQL queries. The last part presented a simple query execution with automatic schema resolution.