What's new in Apache Spark 3.0 - Apache Kafka integration improvements

Versions: Apache Spark 3.0.0

After previous presentations of the new date time and functions features in Apache Spark 3.0 it's time to see what's new on the streaming side in Structured Streaming module, and more precisely, on its Apache Kafka integration.

The post is divided into 5 parts. In the first one, I'll catch up and present the changes for the delegation token support, initially introduced in 2.4. After that, I will present the new field available in the source, namely the headers. In two other sections, I will cover, respectively, producer and consumer cache changes. By the end, I will list the remaining evolutions.

Delegation token support

When I was analyzing the tasks impacting Apache Kafka, the words I saw the most often were "delegation token support". Since I had no idea what is all about, I started by figuring that out. This delegation token support is an Apache Kafka feature introduced with KIP-48 to address security issues of the Kerberos authentication layer, added in Kafka 0.9.0. Kerberos had some drawbacks impacting mainly the performance and security (eg. compromised ticket-granting-ticket can grant access to more things than only Kafka service). To address all of that, the community behind Kafka proposed a solution based on delegation tokens. A delegation token is a shared secret between clients and the broker. An example of the workflow, inspired by gaborgsomogyi's kafka-delegation-token project looks like:

Structured Streaming provides delegation tokens from the previous major release (2.4.0). However, this first version didn't fully support all of the features. That's why the missing ones were added to this brand new 3.0 version. The first of them is the support for multi-cluster Kafka. It means that from now on, you're able to use the delegation token with multiple brokers. Another major change was the clarification for the case when the token is retrieved by a proxy user. A proxy user is a user able to generate tokens for other users. This feature is not currently implemented in Structured Streaming and every time you will try to use it, you'll get the Obtaining delegation token for proxy user is not yet supported error. And the final thing, the documentation. A new chapter about "Security" and "Delegation token" was added to the documentation of the Apache Kafka integration.

Headers support

Another important improvement is the support for Kafka headers. If you specify .option("includeHeaders", "true") on your streaming reader, you'll be able to use Kafka's headers functionality. This feature extends the basic streaming capabilities by providing metadata support at message basis. The headers can be then retrieved with SQL expressions. To illustrate that, I will use MessageWithHeaderExample from my Github's kafka-playground project and Apache Spark source defined like:

  val producer = new KafkaProducer[String, String](configuration)
  (0 to 10).foreach(nr => {
    producer.send(new ProducerRecord[String, String](
      "raw_data", 0,
      s"key${nr}", s"value${nr}", Seq[Header](new RecordHeader("generation_context", "localhost".getBytes)).asJava
    ))
  })
  producer.flush()

  val kafkaReaderWithHeaders = testSparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:29092")
    .option("subscribe", "raw_data")
    .option("includeHeaders", "true")
    .option("startingOffsets", "earliest")
    .load()

  kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
    .writeStream.format("console").option("truncate", false).start().awaitTermination()


You can see the execution results in the following video:

Producer caching

One other important bug fix implemented in the 3.0 release (SPARK-21869) handles cached producer expiration. Previously Kafka clients were cached with Google Guava's cache where the TTL was set to "last accessed time + timeout configuration". It didn't work correctly for the tasks using the producer for longer than the specified timeout. That's why, after different PR proposals, Apache Spark 3.0 integrates a new producer pool where the criteria for the cache eviction are the references count and last returned time if the references counter goes to 0. The reference counter is incremented or decremented whenever a client acquires or releases the reference to the producer from the cache. Below you can find what happens when the producer is acquired for writing by a task:

// KafkaDataWriter
  def write(row: InternalRow): Unit = {
    checkForErrors()
    if (producer.isEmpty) {
      producer = Some(InternalKafkaProducerPool.acquire(producerParams))
    }
    producer.foreach { p => sendRow(row, p.producer) }
  }

// InternakKafkaProducerPool
  private[producer] def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = {
    val updatedKafkaProducerConfiguration =
      KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
        .setAuthenticationConfigIfNeeded()
        .build()
    val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedKafkaProducerConfiguration)
    synchronized {
      val entry = cache.getOrElseUpdate(paramsSeq, {
        val producer = createKafkaProducer(paramsSeq)
        val cachedProducer = new CachedKafkaProducer(paramsSeq, producer)
        new CachedProducerEntry(cachedProducer,
          TimeUnit.MILLISECONDS.toNanos(cacheExpireTimeoutMillis))
      })
      entry.handleBorrowed()
      entry.producer
    }
  }
  private[producer] class CachedProducerEntry(
      val producer: CachedKafkaProducer,
      cacheExpireTimeoutNs: Long) {
    private var _refCount: Long = 0L
    private var _expireAt: Long = Long.MaxValue

    /** exposed for testing */
    private[producer] def refCount: Long = _refCount
    private[producer] def expireAt: Long = _expireAt

    def handleBorrowed(): Unit = {
      _refCount += 1
      _expireAt = Long.MaxValue
    }
// ...
  private[producer] def release(producer: CachedKafkaProducer): Unit = {
    synchronized {
      cache.get(producer.cacheKey) match {
        case Some(entry) if entry.producer.id == producer.id =>
          entry.handleReturned(clock.nanoTime())
        case _ =>
          logWarning(s"Released producer ${producer.id} is not a member of the cache. Closing.")
          producer.close()
      }
    }
  }

The producers are evicted from a background thread scheduled at spark.kafka.producer.cache.evictorThreadRunInterval milliseconds:

  private[producer] class CachedProducerEntry(
      val producer: CachedKafkaProducer,
      cacheExpireTimeoutNs: Long) {
// ...
def expired(curTimeNs: Long): Boolean = _refCount == 0 && _expireAt < curTimeNs

// InternalKafkaProducerPool
  private def startEvictorThread(): Option[ScheduledFuture[_]] = {
    val evictorThreadRunIntervalMillis = conf.get(PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL)
    if (evictorThreadRunIntervalMillis > 0) {
      val future = executorService.scheduleAtFixedRate(() => {
        Utils.tryLogNonFatalError(evictExpired())
      }, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
      Some(future)
    } else {
      None
    }
  }
  private def evictExpired(): Unit = {
    val curTimeNs = clock.nanoTime()
    val producers = new mutable.ArrayBuffer[CachedProducerEntry]()
    synchronized {
      cache.retain { case (_, v) =>
        if (v.expired(curTimeNs)) {
          producers += v
          false
        } else {
          true
        }
      }
    }
    producers.foreach { _.producer.close() }
  }

Consumer caching

But the producer's part is not the single one changed in the 3.0 release. Also the consumer's one evolved. Starting from 3.0, it leverages Apache Commons Pool library for the cache management for both consumers and fetched data. Thanks to this library, the logic managing consumer caching was simplified since the maintenance tasks (eviction, tracking, JMX metrics) were delegated to the Commons Pool. If you check then the changes, you will notice that the reuseKafkaConsumer parameter from KafkaBatchPartitionReader disappeared (or rather delegated t othe Pool):

private case class KafkaBatchPartitionReader(
    offsetRange: KafkaOffsetRange,
    executorKafkaParams: ju.Map[String, Object],
    pollTimeoutMs: Long,
    failOnDataLoss: Boolean,
    includeHeaders: Boolean) extends PartitionReader[InternalRow] with Logging { 
private val consumer = KafkaDataConsumer.acquire(offsetRange.topicPartition, executorKafkaParams)
  def acquire(
      topicPartition: TopicPartition,
      kafkaParams: ju.Map[String, Object]): KafkaDataConsumer = {
    if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
      val cacheKey = new CacheKey(topicPartition, kafkaParams)

      // If this is reattempt at running the task, then invalidate cached consumer if any.
      consumerPool.invalidateKey(cacheKey)

      // invalidate all fetched data for the key as well
      // sadly we can't pinpoint specific data and invalidate cause we don't have unique id
      fetchedDataPool.invalidate(cacheKey)
    }

    new KafkaDataConsumer(topicPartition, kafkaParams, consumerPool, fetchedDataPool)
  }
// ...

The consumer is later retrieved from the cache or created during the data fetch:

  def get(
      offset: Long,
      untilOffset: Long,
      pollTimeoutMs: Long,
      failOnDataLoss: Boolean):
    ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
    require(offset < untilOffset,
      s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")

    val consumer = getOrRetrieveConsumer()
    val fetchedData = getOrRetrieveFetchedData(offset)

// ...
  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
    if (!_consumer.isDefined) {
      retrieveConsumer()
    }
    require(_consumer.isDefined, "Consumer must be defined")
    if (KafkaTokenUtil.needTokenUpdate(SparkEnv.get.conf, _consumer.get.kafkaParamsWithSecurity,
        _consumer.get.clusterConfig)) {
      logDebug("Cached consumer uses an old delegation token, invalidating.")
      releaseConsumer()
      consumerPool.invalidateKey(cacheKey)
      fetchedDataPool.invalidate(cacheKey)
      retrieveConsumer()
    }
    _consumer.get
  }

This change brought also some new configuration entries to control how often the cache eviction process will run (spark.kafka.consumer.cache.evictorThreadRunInterval, spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval) and what cached entries will be eligible for eviction (spark.kafka.consumer.fetchedData.cache.timeout, spark.kafka.consumer.cache.timeout).

An important point to notice in this part is also the presence of FetchedDataPool that stores:

private[consumer] case class FetchedData(
    private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
    private var _nextOffsetInFetchedData: Long,
    private var _offsetAfterPoll: Long)

It's the object used when the consumer returns the fetched records to the client's code, more exactly from this method:

  def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
    val record = _records.next()
    _nextOffsetInFetchedData = record.offset + 1
    record
  }
 
  def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
    assert(_records.hasPrevious, "fetchedData cannot move back")
    val record = _records.previous()
    _nextOffsetInFetchedData = record.offset
    record
  }

Of course, the records are not persisted in memory indefinitely. In normal circumstances, they will be removed after consuming all offsets planned in the given micro-batch execution for the task. And the unused cache entries will be removed from the interval configuration mentioned before, or explicitly by the end of reading the data in the task (KafkaDataConsumer#release:

  def release(): Unit = {
    releaseConsumer()
    releaseFetchedData()
  }

// Called from KafkaSourceRDD
  override def compute(
      thePart: Partition,
      context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
// ...
     val underlying = new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
// ...
        override protected def close(): Unit = {
          consumer.release()
        }
      }

Bug fixes

This new release also brings some fixes for the client part, like a more intelligent way to select the least loaded node by taking throttling into account or seekToEnd position resolution fix.

Among other bug fixes, you can find configuration defaults. Apache Spark 3.0 sets the default value for spark.kafka.sasl.kerberos.service.name property to kafka.

Apart from that, the integration solves the problem of properties. KafkaContinuousStream class has a property called pollTimeoutMs. This parameter defines how long the client will wait for the data arriving from the broker, ie. how long it will block with a poll() call. In previous versions this value was always set to default 512 ms because it was retrieved from lower cased properties defined in KafkaProvider (kafkaconsumer.polltimeoutms) and it was expected to be defined in camelCase-style (kafkaConsumer.pollTimeoutMs). To fix this issue, and at the same time avoid similar ones in the future, Apache Spark 3.0 uses now org.apache.spark.sql.util.CaseInsensitiveStringMap as the input passed to KafkaContinuousStream:

class KafkaContinuousStream(
    private[kafka010] val offsetReader: KafkaOffsetReader,
    kafkaParams: ju.Map[String, Object],
    options: CaseInsensitiveStringMap,
    metadataPath: String,
    initialOffsets: KafkaOffsetRangeLimit,
    failOnDataLoss: Boolean)

The final change from that category is the support of Apache Kafka 2.4.1 (not really a bug fix but didn't know where to put it :P).

That's all I found for Apache Kafka integration improvements. Personally, I missed out on the delegation token support in 2.4, so that's why I covered it pretty widely at the beginning of the post. But apart from that, there are some other interesting changes in the source code, like the metadata support through headers reading and the last bug fix introducing a new way to manage producer's pool to handle their expiration time correctly.