Logs compaction in Apache Kafka - delete and cleanup policy

Versions: Apache Kafka 2.4.0

Since my very first experiences with Apache Kafka, I was always amazed by the features handled by this tool. One of them, that I haven't had a chance to explore yet, is logs compaction. I will shed some light on it in this and next week's article.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

In the first part of the blog post, I will introduce the cleanup policy that will be the main topic of the next articles. After that, I will go through the internal classes involved in the compaction to understand better what happens inside and what delete strategies can apply.

Cleanup policy

Kafka's behavior for compaction is defined in thecleanup.policy property. It applies at topic's level and defines the cleanup behavior which can be delete, compact or both, which means that given topic will be compacted (only the most recent key is kept) and/or cleaned (too old segments deleted).

In this article I will focus only on the delete flag because, as you will see later, the topic is quite complex, and explaining both of them at the same time would be overkill.

Background process

Kafka performs log compaction in a background process defined in CleanerThread class. The number of CleanerThreads depends on the log.cleaner.threads property and the object responsible for their initialization is LogCleaner. Logs cleaning, which by the way can be disabled with log.cleaner.enable property, is managed by LogManager class.

CleanerThread is a long-running process that communicates with LogCleanerManager to retrieve logs to clean. The instance of LogCleanerManager is shared by all cleaning threads to guarantee exclusive access to the log segments. To achieve that, it uses exclusive locks that are acquired every time the segments to clean are resolved.

The last important object is the one involved in the physical cleaning. It's an instance of Cleaner class initialized inside every CleanerThread. A simplified dependencies diagram for all these classes looks like that:

Logs deletion

Let's begin with the easiest cleanup policy, logs removal. To understand what happens, we have to start with the "when". How often the logs will be deleted? The cleaning thread (CleanerThread) runs indefinitely but doesn't work the whole time. Instead it does the cleaning and sleeps for the time specified in log.cleaner.backoff.ms property.

Once awaken, the CleanerThread instance goes to its tryCleanFilthiestLog where the compaction happens. The name of this method can seem strange at first glance but "filthiest" is another way to describe something as "dirty". The method is responsible for both compaction and deletion but let's focus only on the latter one in this article. By the way, this fact shows that CleanerThread is not exclusively linked to any topic because we can use a different cleanup policy per topic.

At the beginning of the deletion process, the LogCleanerManager returns a list of logs to delete from its deletableLogs() method. A partition's segment can be deleted if:

Once all eligible partitions with logs are retrieved, they're put to the in-progress list and returned to the CleanerThread. The cleaner iterates over all logs and deletes too old segments. Just to recall, a log is a sequence of segments and that's why there is no need to extra fetch for them.

Deletion modes

The deletion process is driven by the cleanup policy configuration. If it's set to delete, the following things happen:

If the cleanup policy is not set to delete, only the last deletion is performed. So regarding the conditions from the previous step, only the last, deleteLogtartOffsetBreachedSegments, will be performed by CleanerThread.

So when all 3 methods are executed? They're executed every log.retention.check.interval.ms by the LogManager. This operation seems to be more a garbage-collection operation whereas the one made by the CleanerThread deletion applies more to the compacted segments.

Asynchronous deletion

The deletion process doesn't remove the segment files immediately. Still, they're removed from all the in-memory mapping of the log to make them invisible for the process, but the files are marked for deletion. How does it work? The delete method takes an asyncDelete flag as a parameter and depending on its value, deletes the files immediately or simply schedules the physical delete.

In the case of cleanup policy the delete is scheduled and will execute in file.delete.delay.ms milliseconds. How the physical deletion process knows what files should be deleted? They're in the scope of the function:

  private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = {
    segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))

    def deleteSegments(): Unit = {
      info(s"Deleting segments $segments")
      maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
        segments.foreach(_.deleteIfExists())
      }
    }

    if (asyncDelete) {
      info(s"Scheduling segments for deletion $segments")
      scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
    } else {
      deleteSegments()
    }
  }

As you can see, the cleanup process also marks all files as "to be deleted" by adding the .deleted suffix to them. In my understanding, the goal of this operation is to hide these files from any processes as long as the physical delete doesn't happen and this behavior is exposed in the Log methods like:

  private def isIndexFile(file: File): Boolean = {
    val filename = file.getName
    filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix) || filename.endsWith(TxnIndexFileSuffix)
  }

  private def isLogFile(file: File): Boolean =
    file.getPath.endsWith(LogFileSuffix)

Log deletes

So far I described the segments deletion but in the code of the presented classes you can also see another delete process, triggered by LogManager that concerns the whole logs, so all segments at once! This process is not related to the cleanup policy and is more involved in the replication process.

This feature was implemented as a part of KIP-113 which introduced the possibility to change the log directory of a given replica. To handle the log removal in that case, Apache Kafka also uses the suffix technique, except here the whole log directory is marked for deletion by adding the -delete suffix to it. The logs are also added to LogManager's logsToBeDeleted queue.

LogManager uses the queue to get the old logs to delete. Under-the-hood, one of the log deletion components is previously mentioned deleteSegmentFiles method that, this time, is invoked synchronously.

Before I started to write this article, I wanted to present both compaction and delete policies. However, with the exploration moving forward I saw that it can be hard because the topic was not as easy as I thought. You can see that it's true, even though the internal classes architecture is quite straightforward, there are a lot of interesting aspects to cover like async vs sync deletes or log vs log segments deletes. That's why only in the next article from the series you will discover the log compaction process.