Records writing in Apache Kafka

Versions: Apache Kafka 2.4.0

In my journey to understand transaction internals in Apache Kafka, I discovered another intriguing class that by the way led me to a few others ;) This class is RecordBatch but in this blog post you will also meet MemoryRecords and FileRecords.

To discover all these classes, I will start by zooming at what happens on the client's side when the records are produced. After that, I will verify what happens on the broker side.

Client side

The workflow of the classes involved in records production (= when your producer calls send method) is quite straightforward and it looks like that:

Everything starts when the key and value are passed in send method that passes it to doSend method. Why not a direct call instead? Inside the former function you will see the presence of the interceptors that can mutate the record generated by the user. This list is by default empty but you can implement your own interceptor by creating a class defining the methods from ProducerInterceptor interface and put it to the interceptor.classes configuration entry.

Once, eventually, mutated, the record is later physically delivered to the broker. But before it happens, the client first retrieves the last batch of records represented by ProducerBatch class. Internally, every partition has its own deque of ProducerBatches that is always used by the producer in an exclusive way, ie. there is always one thread that can modify it.

If the ProducerBatch for given partition doesn't exist, it's first created written from MemoryRecordsBuilder class that stores all records to deliver inside a field called appendStream, being an instance of DataOutputStream. The accumulation happens as long as there is still some free space on the ProducerBatch. This free space is configured by batch.size property and changes every time the producer sends a new record.

When the batch is full, the physical delivery is triggered by awakening the sender thread. Since I already covered this aspect in Apache Kafka and max.in.flight.requests.per.connection, I won't repeat it here and go directly to the broker part.

Broker side

On the broker's side, the workflow looks like that (no transactional aspects included):

First, the client's request is received by the broker and deserialized into an instance of ProduceRequest that has a map of MemoryRecords to add for every partition. The same class is exposed by the ProducerBatch presented in the previous section when the request to send is created by the client.

Before the logs are passed to the ReplicaManager, they're validated. The validation step checks the supported versions and compression codecs. After, the records go to the append-chain. What happens inside depends on the type of the write. If the write occurs on the replica, the path *ToFollower/*AsFollower is used. And when that happens, the assignOffsets flag is set to false. This boolean entry defines whether the offsets will be assigned for every record or not before performing the physical append to the log segment. And the offsets generation can be only made on the leader's node. It's also during this physical persistence where a new log segment can be created if the old one doesn't have enough space or it's too old.

By the end, at the lowest level of the append-chain, FileRecords class gets the MemoryRecords and writes the records into the associated FileChannel:

// FileRecords
FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);

// MemoryRecords
private final ByteBuffer buffer;
    public int writeFullyTo(GatheringByteChannel channel) throws IOException {
        buffer.mark();
        int written = 0;
        while (written < sizeInBytes())
            written += channel.write(buffer);
        buffer.reset();
        return written;
    }

And here we are. Before writing all these posts about Apache Kafka, I wanted simply to learn and understand transactions internals. However, it's hard to achieve when you face something new in every analyzed function. But I feel that I'm coming closer and will write the final part of my exploration soon!

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!