Messages in Apache Kafka

on waitingforcode.com

Messages in Apache Kafka

An intrinsic part of each messaging system are messages. After learnt previously about producing and consuming messages, it's good moment to see what these messages really are.

This post covers messages aspect of Kafka more in details. In the first part we can learn how Kafka stores data in disk. The second part describes the format of messages before and after they're persisted on disk.

Kafka logs

Before talking precisely about messages, let's see start by seeing what happens when a topic is created. Things happen mostly in 2 places: in disk and ZooKeeper (already covered a little in the article about ZooKeeper role in Apache Kafka). The information about disk storage is configured through logs.dir property. It represents a comma-separated list of directories where logs will be stored. We leave a default value for it (/tmp/kafka-logs). We can now go there, check which is the content of the directory, create a topic called 'messagetesttopic' and see which files were added to logs container.

Original Kafka installation contains this structure:

/tmp/kafka-logs$ tree
.
├── cleaner-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
└── replication-offset-checkpoint

0 directories, 4 files

When a new topic is created, some new files appear:

# topic creation code
#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic messagetesttopic
/tmp/kafka-logs$ tree
.
├── cleaner-offset-checkpoint
├── messagetesttopic-0
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── messagetesttopic-1
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── messagetesttopic-2
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── meta.properties
├── recovery-point-offset-checkpoint
└── replication-offset-checkpoint

3 directories, 10 files

As you can see, there are 3 new directories and 6 new files. Each directory represents a partition of created topic. As you can see, we've created a topic with 3 partitions. Inside them we can see files storing binary data of index and log type. Why their names start with 00000000000000000000 ? The naming policy is based on the offset of the first message defined in the file. Because our topic doesn't have any data, its name begins with the initial, 0, offset. And how Kafka knows when to create new log files ? It's configured in log.segment.bytes property. When the size of current log file is equal to the value defined there, new log segment file is created. Let's check it by adding 25 messages (log.segment.bytes=50):

/** 
for (int i = 0; i < 25; i++) {
  producer.send(new ProducerRecord<>(TOPIC_NAME, ""+i, "The "+i));
}
*/
tree
.
├── cleaner-offset-checkpoint
├── messagetesttopic-0
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000001.index
│   ├── 00000000000000000001.log
│   ├── 00000000000000000002.index
│   └── 00000000000000000002.log
├── messagetesttopic-1
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000001.index
│   └── 00000000000000000001.log
├── messagetesttopic-2
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000001.index
│   ├── 00000000000000000001.log
│   ├── 00000000000000000002.index
│   └── 00000000000000000002.log
├── meta.properties
├── recovery-point-offset-checkpoint
└── replication-offset-checkpoint

3 directories, 20 files

You can also note that each .log file has a companion with .index extension. These .index files map offsets to file positions. These files are preallocated automatically by Kafka with the size defined in log.index.size.max.bytes property (10 MB by default). After log rolls, the size of this file is decreased. For example, if we look at partition 0, we can see an index file of 10MB for offset 2 and 2 empty files for previous offsets (because logs were cleaned):

ll messagetesttopic-0/
total 20
drwxrwxr-x 2 bartosz bartosz     4096 mai 15 07:02 ./
drwxrwxr-x 5 bartosz bartosz     4096 mai 15 07:06 ../
-rw-rw-r-- 1 bartosz bartosz        0 mai 15 07:02 00000000000000000000.index
-rw-rw-r-- 1 bartosz bartosz       32 mai 15 07:02 00000000000000000000.log
-rw-rw-r-- 1 bartosz bartosz        0 mai 15 07:02 00000000000000000001.index
-rw-rw-r-- 1 bartosz bartosz       34 mai 15 07:02 00000000000000000001.log
-rw-rw-r-- 1 bartosz bartosz 10485760 mai 15 07:02 00000000000000000002.index
-rw-rw-r-- 1 bartosz bartosz       34 mai 15 07:02 00000000000000000002.log

But to really see the utility of index files, we must pass to the next part because it's related to messages format.

Kafka messages format

To analyze messages we'll see what happens in Java API when a message is sent and received. But before, let's talk about message format. Message is represented by Java Record class which contains a write() method. Inside it we can easily see message format:

// write crc
compressor.putInt((int) (crc & 0xffffffffL));
// write magic value
compressor.putByte(CURRENT_MAGIC_VALUE);
// write attributes
compressor.putByte(attributes);
// write timestamp
compressor.putLong(timestamp);
// write the key
if (key == null) {
  compressor.putInt(-1);
} else {
  compressor.putInt(key.length);
  compressor.put(key, 0, key.length);
}
// write the value
if (value == null) {
  compressor.putInt(-1);
} else {
  int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
  compressor.putInt(size);
  compressor.put(value, valueOffset, size);
}

As you can see, written data are:

  • CRC - CRC32 checksum used to detect eventual message corruption
  • "magic number" - 0 or 1, it adds the possibility to change message format
  • attributes - defines message attributes, such as compression, timestamp type
  • timestamp - tells when given messages was produced; available since Kafka 0.10.0
  • key - message key
  • value - message value
  • size - two entries, one representing key length and another one representing payload length

When message is written to disk, it's appended to log file. Each log file contains a set of log entries. On reading, they're represented by a class called LogEntry. If we take a look on it, we can see that it's composed by 2 fields: the first representing offset and the second representing serialized message with all fields described previously (CRC, "magic number"....).

The offset associated to each log entry represents in the reality the physical position of message in the file. In the other side, we have offsets used by consumers which are normal, incremented by the number of consumed messages (1 if 1 message is consumed, 2 if 2, 3 if 3, and so on), integers. The goal of .index file is to make a relationship between logical offset representation, used among others by consumers, and physical offset, representing messages position in data file. Thanks to this mapping, messages lookup can be made fast.

Messages are a key part of Apache Kafka. In the first part we can see how this messaging system deals with the storage of message on disk. We can see that each partition has its own directory inside one of directories storing logs (configurable entry). Inside partition directories we can find 2 files: one storing messages in binary format and one storing mapping between logical and physical offsets. The difference between these 2 kinds of offsets is explained in the second part of the post. We can also find there the explanation about message format.

Share on: