Even though the change I will describe in this blog post is still in progress, it's worth attention, especially that I missed the DataSource V2 evolution in my previous blog posts.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
DataSource V2, why?
As you already know, the DataSource V2 API is not a new feature. It started in SPARK-15689 which was resolved in the 2.3 release. The goal was to address the issues of the previous data source implementation (V1) which were defined by Wenchen Chan in the JIRA's SPIP document:
1. Since its input arguments include DataFrame/SQLContext, the data source API compatibility depends on the upper level API. 2. The physical storage information (e.g., partitioning and sorting) is not propagated from the data sources, and thus, not used in the Spark optimizer. 3. Extensibility is not good and operator push-down capabilities are limited. 4. Lacking columnar read interface for high performance. 5. The write interface is so general without transaction supports.
To overcome these issues, Wenchen Chan proposed a new API based on the following principles:
- Java friendly - to easily keep the compatibility with Java. You will see, a lot interfaces implemented as a part of a DataSource V2 rework, are in fact Java interfaces!
- Independency - the new API should be independent on DataFrame, SQLContext, RDD and SparkSession objects
- Extensibility for data read operations - it should be particularly easy to extend the data source by the predicate pushdown or column pruning support.
- Better integration with Apache Spark optimizer - the new API should be able to propagate physical data storage information (e.g, partitioning) and dataset statistics to the query optimizer
- Support for columnar and row reads
- Interface supporting transactions for write operations
- A valid alternative to HadoopFsRelation storing metadata information about the read files
- A valid alternative to Hive-specific table read/write plans
That's a lot of information, doesn't it? Let's make it a bit more concrete and see the classes added to solve the problems mentioned above!
DataSource V2 - classes
To see what component solve the problems listed above, let's analyze one V2 data source. Since I will talk about JDBC's just after, in this section we'll focus on the JsonDataSourceV2 class which, as you can correctly deduce, is responsible for reading JSON data. First, the dependency on other Apache Spark components is reduced to SparkSession and mainly to the configuration stored by this class. You can find an example in the getTable(options: CaseInsensitiveStringMap) method:
lazy val sparkSession = SparkSession.active override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) val tableName = getTableName(options, paths) val optionsWithoutPaths = getOptionsWithoutPaths(options) JsonTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat) }
When it comes to the extendability, the file-based data sources are based on an interface called TableProvider, created to the data sources living outside a data catalog. You will find there the information about the table, as presented in the snippet above, but also about the partitioning scheme, the dataset schema and the physical location for the files. And that's only the definition. Every V2 DataSource has helper I/O classes to read and write the data. If we take a look at our JSON source, we'll see a JsonPartitionReaderFactory class responsible for creating a partition reader, a JsonScanBuilder class that will generate the scan operation. And guess what? There is no sign of the HadoopFsRelation present in V1 DataSource:
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = { val relation = (providingInstance(), userSpecifiedSchema) match { // ... case (format: FileFormat, _) => // ... HadoopFsRelation( fileCatalog, partitionSchema = partitionSchema, dataSchema = dataSchema.asNullable, bucketSpec = bucketSpec, format, caseInsensitiveOptions)(sparkSession)
FileFormat is the class used by file-based V1 data sources and you can see it pretty clearly in the fallbackFileFormat returned by the JsonDataSourceV2:
class JsonDataSourceV2 extends FileDataSourceV2 { override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[JsonFileFormat]
But I'm digressing now. Let me back to the list of addressed problems in the DataSource V2. One of them was the support for columnar and row format implemented within the partition readers I shortly mentioned previously:
abstract class FilePartitionReaderFactory extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { assert(partition.isInstanceOf[FilePartition]) val filePartition = partition.asInstanceOf[FilePartition] val iter = filePartition.files.toIterator.map { file => PartitionedFileReader(file, buildReader(file)) } new FilePartitionReader[InternalRow](iter) } override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { assert(partition.isInstanceOf[FilePartition]) val filePartition = partition.asInstanceOf[FilePartition] val iter = filePartition.files.toIterator.map { file => PartitionedFileReader(file, buildColumnarReader(file)) } new FilePartitionReader[ColumnarBatch](iter) }
And how to extend the behavior of a V2 data source? By using the interfaces defining capabilities of every source from the org.apache.spark.sql.connector.catalog package. You will find among them:
- SupportsRead marking a table readable
- SupportsWrite indicating that the table supports writes
- SupportsDelete characterizing a table which supports in-place deletes
- SupportsPartitionManagement defining the table as partitioned
Mentioning all these interfaces is a great moment to move to the new V2 DataSource not yet supported in Apache Spark 3.1.1, the JDBC one!
JDBC DataSource V2
Why did I write it? The JIRA ticket adapting the JDBC data source to the V2 API is still in progress. Nonetheless, we can already see a few interesting changes. One of them is the support for the aforementioned APIs!
case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions) extends Table with SupportsRead with SupportsWrite {
As you can deduce, the table can be used in read and write mode:
override def newScanBuilder(options: CaseInsensitiveStringMap): JDBCScanBuilder = { val mergedOptions = new JDBCOptions( jdbcOptions.parameters.originalMap ++ options.asCaseSensitiveMap().asScala) JDBCScanBuilder(SparkSession.active, schema, mergedOptions) } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { val mergedOptions = new JdbcOptionsInWrite( jdbcOptions.parameters.originalMap ++ info.options.asCaseSensitiveMap().asScala) JDBCWriteBuilder(schema, mergedOptions) }
But there is one detail, you can't use it, at least for now. Why? JDBC V2 is neither a TableProvider nor a DataSourceRegister. In other words, the method responsible for resolving the V2 data sources won't find it because those are 2 single lookup options:
def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = { val useV1Sources = conf.getConf(SQLConf.USE_V1_SOURCE_LIST).toLowerCase(Locale.ROOT) .split(",").map(_.trim) val cls = lookupDataSource(provider, conf) cls.newInstance() match { case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None case t: TableProvider if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) => Some(t) case _ => None } }
But because this blog post is also about DataSource V2 API I missed in my previous updates, I will show you a short demo of it using JSON files:
To recall the current status, JDBC DataSource V2 integration is still in progress. However, Max Gekk, Huaxin Gao and Prashant Sharma did an amazing work so far, and thanks to it, there are only 5 subtasks remaining to implement!