From Apache Spark connector to Apache Pulsar basic concepts

Versions: Apache Spark 2.4.3, Apache Pulsar 2.4.1

Some time ago I saw an interesting presentation about Apache Pulsar and it intrigued me. Compute separated from the storage in a streaming system? Sounds great! In this series of posts, I will try to understand how different challenges were solved but I will start by making an exercise of trying to figure out Apache Pulsar's architecture from its Structured Streaming connector.

Of course, I already have some background about Pulsar, I know that there are something like BookKeeper, ZooKeeper, that the storage can be distributed, that you can write your Pulsar functions and execute them very close to the data, but that's only a vague knowledge. By analyzing the connector I hope to learn a few more things before going to the code source of the server and clients. That's why do not consider this post as "the" introduction. It's rather an exploratory game to see how many things I can figure out by simply reading the connector's code :-) (spoiler alert: I had to read the doc as well). The game that I will play in FAQ mode that I start to appreciate as a good way to organize the knowledge.

Reading from Pulsar

What is the topology?

By "topology" I mean here the elements composing Apache Pulsar data source. To figure them out I analyzed how the connector initializes the client used to read data. In the code responsible for that you can see 3 important Pulsar properties: a topic, a subscription name and a subscription type.

Topic in Pulsar is the same idea as a topic in Kafka, ie. a logical group of data exposed to the client. From that point, I wanted to know what is the composition of the topic in Apache Pulsar. At first glance, it seems like the topic was composed of partitions which is perfectly fine. I'm deducing that from the object used to represent an Apache Pulsar's partitions in Apache Spark:

private[pulsar] case class PulsarSourceRDDPartition(index: Int, offsetRange: PulsarOffsetRange)
    extends Partition

And after quick research, I can confirm that supposition. A topic is composed of partitions and every partition of multiple segments. That last concept is very interesting and I will highlight it in one of my next posts. An interesting point to notice here is that the topic may not be partitioned at all.

Besides topic and partitions, Apache Pulsar uses the concept of subscription. A subscription is like a proxy between the client and the broker data, and it's composed of 2 items.

The first item is the subscription type. Globally, it defines how many consumers can subscribe to a given subscription and how the messages will be delivered to them. The second item is subscription name and it identifies the subscription.

In the connector, the subscription name is generated with a random UUID for every executor. The subscription type is hardcoded to exclusive which means that only 1 consumer can subscribe to the subscription.

Why exclusive subscription is the default?

The exclusive subscription seems to be the closest in terms of semantic to Apache Spark partition-based consumers. To explain why I will explain 3 other subscription types. The first type is called shared and here multiple consumers will read messages in a round-robin manner. Probably the drawback that eliminates this mode from the connector is the inability to guarantee message ordering, brought automatically by partition-based processing.

The second type is failover and here multiple consumers will read messages from the topic but only 1 of them will receive a message in a given time. In the beginning, I was wondering why this mode wasn't used instead of the exclusive mode - especially that according to the Javadoc, it could be a great fit for partition-based processing: "The partitions assignments will be split across the available consumers. On each partition, at most one consumer will be active at a given point in time."

But After reading the documentation, I (probably) understood the reason. In failover mode, there are 1 master consumer and multiple slaves. The slave is promoted only when the master fails. After that failure, one of the slaves becomes a new master and it receives all new messages. It's an Apache Pulsar fault-tolerance mechanism reducing as most as possible the time of unavailability of the consumer. However once again, it's not how Structured Streaming consumers behave in case of errors. If one of the tasks fails, it's simply retriggered.

The 3rd subscription type is key-shared and here all messages with the specific key will go to the same consumer. Structured Streaming is based on physical partitioning, ie. how the data is stored in the broker. Logical partitioning can be managed later by the groupByKey transformations.

The last mode, the one used in the connector, is exclusive and here only 1 consumer will be able to subscribe to given subscription. And since Structured Streaming source is partition-based, having 1 consumer to read all partitions processed in one executor is pretty fine:

def computeInner(
  topic: String,
  startOffset: MessageId,
  endOffset: MessageId,
  context: TaskContext): Iterator[InternalRow] = {
// ...
lazy val consumer = CachedPulsarClient
  .getOrCreate(clientConf)
  .newConsumer(schema)
  .topic(topic)
  .subscriptionName(s"$subscriptionNamePrefix-${UUID.randomUUID()}")
  .subscriptionType(SubscriptionType.Exclusive)
  .loadConf(consumerConf)
  .subscribe()

new NextIterator[InternalRow] {
  try {
    consumer.seek(startOffset)
  } catch {
    case e: Throwable =>
      reportDataLoss(s"Failed to seek to previous $startOffset, data loss occurs")
  }
 
 // ...

How to manipulate offsets

We can move inside a partition with seek operation. It can be either timestamp- or message-based. The former method helps to retrieve messages if we know theirs publish time. However, it's not the operation used in Apache Spark connector since it's only reserved to non-partitioned topics.

Why so? To access data in a partitioned topic, which is used in Structured Streaming, you have to specify an offset with the ledger id, the entry id and of course, the partition index. Ledger is a kind of append-only closeable and replicated log. It makes perfect sense then that to retrieve a message, we need to know where it was written (ledger), for what partition (index) and at what place (entry id).

If you don't know the exact location of your data, you can start consuming topic from the earliest or latest messages.

Why batch source is present?

If you analyze PulsarSourceRDD.scala file, you will see there the RDDs for streaming (PulsarSourceRDD) and batch (PulsarSourceRDD4Batch) data source. Both share the same base class, hence most of the operations will be the same. The single difference consists of retrieving the last offset to read in the processing. Since streaming is a batch query evolving at every execution, we're able to read data indefinitely, so having latest as the closing offset makes sense.

That's not the same for the batch approach which is executed only once. When it starts, the connector must know when to stop explicitly and that's the reason why the last offset is resolved like this:

    val end = part.offsetRange.untilOffset match {
      case MessageId.latest =>
        Utils.tryWithResource(PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { admin =>
          PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(tp))
        }
      case id => id
    }

But I didn't answer to the main question, why the data can be also processed in batch? Unlike Apache Kafka, the data in Apache Pulsar can be stored on some long-term and distributed data stores like S3, GCS or HDFS. Thanks to that, and AFAIU now, batch processing can be easily applied.

How data can be lost between reads?

As for Apache Kafka connector, Apache Pulsar's also handles data loss. But what does it mean in the context of this messaging broker? I found the answer in PulsarOptions class under INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_(FALSE|TRUE) keys:

  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
    """
      |Some data may have been lost because they are not available in Pulsar any more; either the
      | data was aged out by Pulsar or the topic may have been deleted before all the data in the
      | topic was processed. If you want your streaming query to fail on such cases, set the source
      | option "failOnDataLoss" to "true".
    """.stripMargin

From that text you can discover that Apache Pulsar is also able to manage message retention.

Writing to Pulsar

Does it support batching?

Yes. When the connector creates the producer, it sets 2 properties, max publish delay and max number of messages. The producer will then accumulate new messages and flush them as soon as one of the thresholds is reached.

Currently the value of both attributes is hardcoded to 100 ms and 5242880.

What is the delivery semantic?

The delivery semantic is at least once. How do I know? From that method in PulsarRowWriter:

  protected def checkForErrors(): Unit = {
    if (failedWrite != null) {
      throw failedWrite
    }
  }

But how it's related to at least once delivery? Every time Pulsar sends a new message, it does it asynchronously and assigns a callback which, depending on the result, sets failedWrite variable:

  private val sendCallback = new BiConsumer[MessageId, Throwable]() {
    override def accept(t: MessageId, u: Throwable): Unit = {
      if (failedWrite == null && u != null) {
        failedWrite = u
      }
    }
  }

The checkForErrors method is lated called during the whole writing process:

  def write(row: InternalRow): Unit = {
    checkForErrors()
    sendRow(row)
  }

  def commit(): WriterCommitMessage = {
    // Send is asynchronous, but we can't commit until all rows are actually in Pulsar.
    // This requires flushing and then checking that no callbacks produced errors.
    // We also check for errors before to fail as soon as possible - the check is cheap.
    checkForErrors()
    producerFlush()
    checkForErrors()
    PulsarWriterCommitMessage
  }

  def abort(): Unit = {}

  def close(): Unit = {
    checkForErrors()
    producerClose()
    checkForErrors()
  }

Thus, the writing process can fail at any moment by throwing the exception registered in checkForErrors. As a result, we can retrieve duplicate data in the topic if the retried task succeeds.

How retries are managed?

At first glance I didn't find any configuration mentioning the number of retries. However, after investigating the documentation of ProducerBuilder used to initialize the client from the sink, I found an interesting property called sendTimeout. The property says how long the client will try to deliver the message. So, the retries are controlled here by time rather than the number of allowed requests. If you set this value to 0, the client will try forever to publish the message.

Although this value is not set explicitly in the builder's method, it seems that you can define it in the sink options with pulsar.producer. prefix, like pulsar.producer.sendTimeoutMs. All values defined that way are passed as clientConf here:

  def createProducer[T](
      clientConf: ju.Map[String, Object],
      producerConf: ju.Map[String, Object],
      topic: String,
      schema: Schema[T]): Producer[T] = {

    CachedPulsarClient
      .getOrCreate(clientConf)
      .newProducer(schema)
      .topic(topic)
      .loadConf(producerConf)
      .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
      // maximizing the throughput
      .batchingMaxMessages(5 * 1024 * 1024)
      .create()
  }

How the data is serialized?

The part responsible for converting data from Apache Spark UnsafeRow to Apache Pulsar message is PulsarSerializer. When you analyze the code source, you will find places referring to Apache Avro format. Does it mean that both are closely related?

From the README.md you can learn that:

  
The DataFrame written to Pulsar can have arbitrary schema, since each record in DataFrame is transformed as one message sent to Pulsar, fields of DataFrame are divided into two groups: __key and __eventTime fields are encoded as metadata of Pulsar message; other fields are grouped and encoded using AVRO and put in value()

And that's actually what happens when the data is written to Apache Pulsar. First, the Apache Spark schema is transformed into the Apache Avro's. Later, this transformed schema is used to build a new converter between Apache Spark and Apache Avro in PulsarSerializer class. This converter is invoked inside PulsarSerializer#serialize(catalystData: Any) which is the method called when a new Spark's Row is written:

  protected def sendRow(row: InternalRow): Unit = {
    val metaRow = metaProj(row)
    val valueRow = valueProj(row)
    val value = serializer.serialize(valueRow)

    val topic = metaRow.getUTF8String(0)
    val key = metaRow.getBinary(1)

Although Apache Avro is the output format, it's not a single one in the input. If you verify the code source for PulsarDeserializer, you'll see that Apache Spark's schema is matched against Avro and JSON formats:

private val converter: Message[_] => SpecificInternalRow = {

schemaInfo.getType match {
  case SchemaType.AVRO =>
    val st = rootDataType.asInstanceOf[StructType]
    // ...
  case SchemaType.JSON =>
    val st = rootDataType.asInstanceOf[StructType]
    // ...
  case _ => // AtomicTypes
    val tmpRow = new SpecificInternalRow(Seq(rootDataType) ++ metaDataFields.map(_.dataType))
    // ...

To be honest, I cheated a little. I didn't deduce all of this from the code source. Instead, I had to do some external research to understand better the concepts like subscription types, retries, and batch processing, and do not write completely wrong things. It proves to me that discovering the connector before learning the architecture of the data store is hard to achieve. And that's why in the future I will always start by analyzing the global architecture before going to its connectors and clients. Nonetheless, this short exercise also has some positive effects. It left me hungry enough to play and discover Apache Pulsar more in-depth in 2020.