Configuring Apache Kafka

Until now we've almost used only default configuration values. In this article we'll see some of configuration possibilities more in detail.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

Since consumer and producer configuration were shortly described in the posts about producers and consumers, this time we focus exclusively on topic and broker configuration. The first part presents some of entries useful during topic configuration. The second part describes configuration options for Kafka brokers.

Topic configuration entries

Topic configuration is strictly related to broker one. That said, if one of configuration options is not defined during topic creation, used value will be the same as for broker. A list of correspondences is defined in Kafka document and it won't be treated here.

We can start our analyze with options related to messages. One of them is related to the definition of how many messages can be reconsumed and it's Time defined by this property represents the amount of time after which messages are marked as 'deleted' will be really deleted from disk. Similar property is It tells how many time message must be stored on disk before it's marked as 'deleted'.

But the message can be never removed from disk. This behavior is handled by cleanup.policy option. Its default value is 'deleted' and so, old messages will be physically deleted from disk. But if we switch it to 'compact', old messages will only be compacted to take less place.

How we can configure messages writing ? Mostly with flush-like configuration. An option called flush.messages defines, simply speaking, a batch size of messages which is really persisted to disk. For example, if we set 3 as the value, it means that as soon as 3 messages are present in memory, a fsync on disk will be done on these messages. Other value, with the same behavior but based on time, is It determines after how many milliseconds messages will be flushed to disk. However, Kafka documents advises to not use these 2 properties and recommends to use replication for durability and allow OS to make flushes.

In the article about messages in Kafka we saw some configuration about the max size of .log file on broker side. For topic side, the property to use is named segment.bytes. By the way, similar configuration can apply on .index files. To determine the size of .index file we can override segment.index.bytes property - even if, once again, Kafka documentation doesn't recommend to do that. If we play with these 2 configurations, it's important to take care of max.message.bytes. It represents the max message size which can be appended to the topic.

Broker configuration entries

In the broker side we retrieve similar configuration entries. But since it controls more things, such as cleaning, incoming requests, synchronization with ZooKeeper, we retrieve a lot of other important options. The first one concerns log cleaning. Thanks to log.cleaner.enable we can specify if a process cleaning logs (see cleanup.policy in the previous part) should be enabled. By default it is but if turns off, logs won't never be compacted or removed. Another log-like property,, is used to determine interval time looking for messages can be deleted. Going further, determines how long messages marked as 'deleted' are retained. Finally for this part, cleaning can be accelerated with the increasing of number of cleaning threads with the property log.cleaner.threads.

In additional, broker configuration defines some entries used in the communication with ZooKeeper. The most important, zookeeper.connect, identifies to which ZooKeeper given broker will connect. Two other properties are related to session and connection timeout, respectively and zookeeper.session.timeout. ZooKeeper has also one configuration related to security, zookeeper.set.acl. It tells if the broker must authentify before making writes on ZK instance. If it's set to true, only Kafka brokers will be able to write on Kafka zNodes. The rest of the world will be only able to make reads.

Broker controls also overload by applying quotas. quota.consumer.default and quota.producer.default define how many bytes can be, respectively, consumed and produced. Other load configuration is queued.max.requests. It defines how many request can be queued before being handled by the broker.

Requests handling can be improved with properties family related to threads ran by broker. The first of them,, defines the number of threads used by broker to handle network requests. The other one,, informs how many threads will be used to really execute network requests. But the network is not the single thread-like configuration category because we can also find It defines the number of threads used to flush logs at shutdown or recovery them at startup. The last property, num.replica.fetchers, describes how many threads are used to replicate messages from source broker.

Regarding to replication, one important configuration option exists, min.insync.replicas. It defines the minimum number of replicas that must acknowledge new message write before considering is successful (only used if producer sets acks property to -1). If this minimum is not met, an exception is thrown on producer side and message is not delivered. This property is helpful to enforce messages durability. It can be also defined on topic level.

On broker side also retrieve configurations related to some automatic operations made by server. The first one, auto.create.topics.enable, specifies if server can automatically created not existent topic if it's demanded by consumer or producer. The second one, auto.leader.rebalance.enable, defines if background thread can automatically rebalance leader.

The last, but not the least useful property is delete.topic.enable. If it's turned off (=false, default value), then we won't be able to delete a topc from admin tool script. So, if we're just playing with Kafka, it can be useful to activate this entry. When using Kafka in production, topic deletion can be turned off to avoid strange situations, as removals by mistake.

This post covers some of important Kafka configuration entries. In the first part it presents several entries related to the topic. Mostly, we can find there a description of configuration related to logs or flushing. The second part describes broker configuration. It shared some of points with topic and these points weren't covered. Instead, we can learn there which configuration can be used to communicate with ZooKeeper or to guarantee good level of durability.

If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!