Apache Kafka idempotent producer

on waitingforcode.com

Apache Kafka idempotent producer

You want to become a data engineer and don't know where to start? I was like you 4 years ago when I started to learn the data. From that experience I prepared a 12-weeks course that will help you to become a data engineer. Join the class today! Join the class!
I wrote all posts published this year about Apache Kafka (NIO, max in flight requests) to better understand idempotent producers. In this post I'll try to do that before going further and analyze transactions support.

The post is composed of 3 main sections. In the first one, I will cover the basics of the idempotent producer feature in the not transactional context. Later I will ask myself some questions and try to answer them to make you understand this feature better. Finally, I will play a little bit with the idempotent producer from the code.

Some theory first

Before I try to answer my questions, some theory first. I won't reinvent the wheel here and instead of that, I will summarize everything I found on the KIP introducing the idempotent producer. To put it simply, every producer has an assigned producer id (PID). The assignment is made transparently by the broker during the initialization. After that the producer sends messages alongside a sequence number. That number is topic-partition-based and starts with 0 and increments with every produced message. The broker stores the sequence numbers sent by every producer for given topic-partition and from that it's able to say whether the message is:

  • new - the message's sequence number is one greater than the broker's last sequence number for given producer
  • duplicated - the received sequence number is below the broker's last sequence number for the producer
  • out-of-sequence - the sequence number is higher by more than 1. It means that some messages have been lost

To enable the idempotent producer you must set enable.idempotence to true. In addition to that, you will need to configure max.in.flight.requests.per.connection (<= 5), retries (> 0) and acks (= all). Idempotent producer is about retries as well and if you don't configure that properly, and some records aren't delivered in the first request, your sequence numbers on the broker side and on the producer side will desynchronize. If the user doesn't specify acks and retries properties, Kafka will set them to idempotence defaults which are respectively all and Integer.MAX_VALUE. Since we're working here with the idempotent producer, retries configuration won't introduce duplicates.

Questioning idempotency

That was the theory and the global picture of the implementation. Let's now deep delve into some details. I will do that by asking myself some questions. That technique helped me a lot to prepare my Spark+AI Summit talk and, thus, understand arbitrary stateful processing much better.

What happens if I produce the same message once again?

The point of this question is to know whether you can use the idempotent producer instead of message store to guarantee uniqueness. The answer is no because they're 2 completely different things. The idempotent producer rather guarantees that the retried message is not delivered more than once and not that a given message will be delivered only once during the whole lifecycle of the topic.

All that because of the semantic behind the idempotent producer. The sequence number is incremented with every record, so from that point of view, sending the same message at different moments means sending 2 different records because both will have different sequence numbers.

What is the impact on the broker side?

The broker stores the mapping between producers writing to the given topic partition in the class called ProducerStateManager:

private val producers = mutable.Map.empty[Long, ProducerStateEntry]

The key of the map is the producer's id whereas the value its state composed of:

private[log] class ProducerStateEntry(val producerId: Long,
                                      val batchMetadata: mutable.Queue[BatchMetadata],
                                      var producerEpoch: Short,
                                      var coordinatorEpoch: Int,
                                      var currentTxnFirstOffset: Option[Long]) {

The property which interests us is the batchMetadata queue. Everytime a new batch is about to be appended to the partition's log, new BatchMetadata is added to the queue:

  def addBatch(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long): Unit = {
    maybeUpdateEpoch(producerEpoch)
    addBatchMetadata(BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp))
  }

But it doesn't mean that you will find there all written batches. Instead of that, broker controls the number of memorized batches with that property:

private[log] object ProducerStateEntry {
  private[log] val NumBatchesToRetain = 5
  // ...
}

  private def addBatchMetadata(batch: BatchMetadata): Unit = {
    if (batchMetadata.size == ProducerStateEntry.NumBatchesToRetain)
      batchMetadata.dequeue()
    batchMetadata.enqueue(batch)
  }

What if my producer fails?

In case of failure, the new producer won't keep the id of the failed one. The method responsible for generating the producer's id is generateProducerId from ProducerIdManager class. And the generated id is a simple incremented number:

  def generateProducerId(): Long = {
    this synchronized {
      // grab a new block of producerIds if this block has been exhausted
      if (nextProducerId > currentProducerIdBlock.blockEndId) {
        getNewProducerIdBlock()
        nextProducerId = currentProducerIdBlock.blockStartId + 1
      } else {
        nextProducerId += 1
      }

      nextProducerId - 1
    }
  }

A small detail before we go to the next question. The producer block is a range with allowed producer ids in given execution, backed-up on ZooKeeper and retrieved if the ProducerIdManager starts for the first time. It helps to avoid the range overlapping because the producer block is every time created from the last allowed value + 1:

      val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)

      // generate the new producerId block
      currentProducerIdBlock = dataOpt match {
        case Some(data) =>
          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")

          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
            // we have exhausted all producerIds (wow!), treat it as a fatal error
            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
            throw new KafkaException("Have exhausted all producerIds.")
          }

          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
        case None =>
          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
      }

What happens for the duplicated messages?

The answer is hidden in Sender's completeBatch method. That's the function called in the callback invoked by the producer after receiving broker's response to the writing request.

If the records are duplicated, the producer will consider the write as complete:

            } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
                // If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
                // the sequence of the current batch, and we haven't retained batch metadata on the broker to return
                // the correct offset and timestamp.
                //
                // The only thing we can do is to return success to the user and not return a valid offset and timestamp.
                completeBatch(batch, response);
            } else {

The sequence number is bounded to the number of records or the batch number?

At first I thought that the sequence number is incremented by 1 with every batch. However, if you check the logic on RecordAccumulator, you will see that the sequence number is bounded to the number of records:

    private List drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
               /// ....
                        transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);

The batch variable is an instance of ProducerBatch the recordCount field is incremented with every new record:

    /**
     * Append the record to the current record set and return the relative offset within that record set
     *
     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     */
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {

        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return null;
        } else {
                // ...
this.recordCount++;
            return future;
        }
    }

How sequence numbers are tracked?

The class responsible for sequence numbers is TransactionManager. Its name is maybe a little bit misleading but it's also created when you use idempotent producer without transactions.

Every time a new batch of records is created during RecordAccumulator's drain, the sequence number is incremented with the number of records in the batch (transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount).

Does the sequence only increase?

In normal circumstances yes. However, if something bad happens, like for instance not retryable error, the sequence numbers are adjusted in TransactionManager's adjustSequencesDueToFailedBatch(ProducerBatch batch) method. In what consists the adjustment?

When the batch is considered as failed, it's first removed from the in-flight batches for given topic/partition on TransactionManager's side. Later, the adjustment happens. It consists on decreasing the sequence number of remaining in-flight batches by the number of records stored in the failed and not retried batch.

You can observe that action in the following logs:

Resetting sequence number of batch with current sequence 1 for partition my-topic-repartition-24 to 0 

Why 5 in-flight requests don't break the idempotency?

In the very first version of the idempotent producer, the maximal number of in-flight requests (max.inflight.requests.per.connection parameter) was set to 1. However, it changed with KAFKA-5494 and since then up to 5 max in-flight requests are allowed. What does it mean concretely and how the producer ensures to send batches in order in case of retries?

To recall, all requests are processed by the broker in order. In consequence, the results are returned to the client in order as well. In the non transactional context, Kafka's retry mechanism consists in reenqueing the failed requests to the beginning of the dequeue storing all batches to send - I covered that in Apache Kafka and max.in.flight.requests.per.connection post. For the transactional context, the logic is a little bit different. Everything happens in RecordAccumulator's insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) where the failed batch is reinserted in order.

The reordering algorithm handles 2 cases. The easiest one is when the failed batch sequence number is smaller than the sequence number of the first batch in the in-flight dequeue. If that happens, the failed batch is simply added to the front of the dequeue and the order is kept. Otherwise, a little bit complicated logic is invoked.

In this logic the producer first retrieves all batches whose sequence number is smaller than the sequence number of the failed batch. It removes them from the dequeue and puts in order into an ArrayList. After that, the producer first adds the failed batch to the front of the dequeue and later all remaining batches, starting by the biggest sequence number:

            deque.addFirst(batch);

            // Now we have to re insert the previously queued batches in the right order.
            for (int i = orderedBatches.size() - 1; i >= 0; --i) {
                deque.addFirst(orderedBatches.get(i));
            }


Idempotent producer demo

To show you how idempotent producer works, I need to make a small trick with my IDE. I will execute the test code on debug mode and after the 1st batch I will change the sequence number from 1 to 111. I'm expecting to receive an out-of-sequence error from the broker and a final failure on the producer side:

kafka_1      | org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producerId 7005: 111 (incoming seq. number), 1 (current end sequence number)

That's also a good occasion to introduce my new Github repo, Kafka-playground where I will publish code snippets with Apache Kafka. It's also where you will find Docker image to run the examples. My sample code for simulating idempotent producer looks like that:

object IdempotentProducerExample {

  def main(args: Array[String]): Unit = {
    val topicName = "idempotent_producer_topic"
    Topics.recreateTopic(topicName)

    val idempotentProducer = new KafkaProducer[String, String](ConfigurationMaker.configureProducer(
      extras = Map("enable.idempotence" -> "true", "max.in.flight.requests.per.connection" -> "5",
        ProducerConfig.RETRIES_CONFIG -> "3")
    ))

    (1 to 3).foreach(number => {
      idempotentProducer.send(new ProducerRecord[String, String](topicName, s"value${number}"),
        new Callback() {
          override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
            println(s"${key} ===> Got=${metadata} and ${exception}")
          }
        })
      idempotentProducer.flush()
    })
  }

}

Not a big deal, I simply configured the producer to be idempotent and to retry only few times to get the feedback for manually altered sequence number earlier. You can see the effect of this in the following video:

On the video you can also see another interesting point. When the number of retries was reached, the client reset the producer id, and therefore, its sequence numbers to be able to send new messages without interruption.

Idempotent producer is an interesting feature to avoid duplicates and keep messages in order, even for multiple in-flight requests. Thanks to the sequence number the broker can reject any message with the number different than last number + 1. The producer also has an interesting logic. As you saw in the post, it's able to reorder in-flight batches to keep the sequence numbers logically increasing.

After this and previous posts about NIO and in-flight requests, I'm now ready to deep delve into another Kafka's feature, the transactions. Stay tuned, I'll publish a post about them soon!

Share on:

Share, like or comment this post on Twitter:

Share, like or comment this post on Facebook: