Apache Flink and the input data reading

I'm writing this unexpected blog post because I got stuck with watermarks and checkpoints and felt that I was missing some basics. Even though this introduction is a bit negative, the exploration for the data reading enabled my other discoveries.

Mailbox

The first and most important part to understand is how a particular task starts fetching the data. The answer comes from the MailboxProcessor class that executes a continuous loop that either reads new records from the data source, or processes other messages, such as the checkpoint demands. Overall, the execution flow looks like that:

Mailbox pattern

Mailbox is a pattern coming from the Actors model. It's the place where messages are enqueued before they get processed.

The runMailboxLoop is a while loop with the following logic:

public void runMailboxLoop() throws Exception {
// …
  while (isNextLoopPossible()) {
     // The blocking `processMail` call will not return until default action is available.
     processMail(localMailbox, false);
     if (isNextLoopPossible()) {
       mailboxDefaultAction.runDefaultAction(mailboxController);
     }
  }

The processMail is the method we can ignore here as it doesn't process the data. It's the default action which is set to this processInput method at the initialization time by the StreamTask class:

protected StreamTask(
  this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor,   mailboxMetricsControl);

It's a great moment to jump to the next section and analyze the data reading flow.

Data reading flow

The processInput method is a part of a longer execution chain:

The flow ends in the StreamInputProcessor's processInput method that provides an interface implemented by processors working either on one or multiple data sources. Since the StreamMultipleInputProcessor relies on the StreamOneInputProcessor, let's start by deelving into the latter one.

One input processor

Bad news. The StreamOneInputProcessor is not the class that implements data reading. Instead, it delegates this responsibility to a StreamTaskInput that in its turn is an interface that forwards the data fetching further in the execution flow. The following diagram summarizes this:

As you can see, there are many implementations possible but for the sake of this blog post, let's focus on the StreamTaskSourceInput. Why this one? Because it's the one involved in the Kafka example from the last section. Its emitNext implementation delegates data retrieval to a SourceOperator's emitNext function:

@Override
public DataInputStatus emitNext(DataOutput<T> output) throws Exception {
    if (isBlockedAvailability.isApproximatelyAvailable()) {
        return operator.emitNext(output);
    }
    return DataInputStatus.NOTHING_AVAILABLE;
}

On its own, the SourceOperator forwards the call - yes, yet again - to the SourceReader and its pollNext function:

    InputStatus status;
    do {
        status = sourceReader.pollNext(currentMainOutput);
    } while (status == InputStatus.MORE_AVAILABLE && canEmitBatchOfRecords.check() && !shouldWaitForAlignment());
    return convertToInternalStatus(status);

Again, since the workflows has many hops, a schema should help you stay focused before discovering the next parts:

Splits and source reader

The next step in the reading logic is the source reader. Apache Flink comes with a basic reader SourceReaderBase class that relies on a concept called split.

Split is a metadata representation of the data source. For example, in Apache Kafka it'll represent the partition within a topic, alongside the processed offsets. Split alone does nothing but combined with SplitFetcherManager, it's responsible for reading the data from a data store. Well, technically speaking there is also a SplitFetcher associated to each split that runs a SplitReader. Only this reader class is the final intermediary between the data store and Apache Flink as it defines a fetch method with the following signature:

public interface SplitReader<E, SplitT extends SourceSplit> extends AutoCloseable {
	RecordsWithSplitIds<E> fetch() throws IOException;

In summary, the next call sequence looks like that:

As you can see in the schema, SplitFetcherManager submits a dedicated split reader for each managed split. The reader communicates with the data source and retrieves the records to process that it later puts into a local queue:

// FetchTask
public boolean run() throws IOException {
  try {
    if (!isWakenUp() && lastRecords == null) {
      lastRecords = splitReader.fetch();
    }
    // ...
    if (elementsQueue.put(fetcherIndex, lastRecords)) {
      if (!lastRecords.finishedSplits().isEmpty()) {
        splitFinishedCallback.accept(lastRecords.finishedSplits());
      }
      lastRecords = null;
    }

The queue is where the records are polled from but there is yet another class involved in the process. The emitter.

Emitters

RecordEmitter is also a member of SourceReaderBase. It's involved directly to handle the returned split in the pollNext method:

// SourceReaderBase
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
  RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
  while (true) {
    final E record = recordsWithSplitId.nextRecordFromSplit();
    if (record != null) {
      // ...
      recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);

As you can see, there is another interface, the ReaderOutput. It provides collect methods that are later called with each fetched record. The interaction happens in the SourceOperator:

// SourceOperator
public DataInputStatus emitNext(DataOutput<OUT> output) throws Exception {
  // ...
  InputStatus status;
  do {
    status = sourceReader.pollNext(currentMainOutput);
  } while (status == InputStatus.MORE_AVAILABLE && canEmitBatchOfRecords.check() 
    && !shouldWaitForAlignment());
  return convertToInternalStatus(status);
}

I won't cover here how the records are exchanged between tasks as it involves a more general topic including local and network exchanges. Stay tuned, I'll cover it later in the Flink series.

Multiple inputs

The StreamMultipleInputProcessor class involves ... StreamOneInputProcessor! Yes, the inputs are a part of this array:

public final class StreamMultipleInputProcessor implements StreamInputProcessor {
  private final StreamOneInputProcessor<?>[] inputProcessors;

Therefore, you already know the data reading flow. But what you don't know is how the processor runs these inputs. In fact, it defines a MultipleInputSelectionHandler inputSelectionHandler that selects the next reading input processor here:

private int selectNextReadingInputIndex() {
  if (!inputSelectionHandler.isAnyInputAvailable()) {
    fullCheckAndSetAvailable();
  }

  int readingInputIndex = inputSelectionHandler.selectNextInputIndex(lastReadInputIndex);

The highlighted method calls InputSelection.fairSelectNextIndex that selects the next processor to the previously selected.

Apache Kafka example

I'm aware the blog post is long and the number of hops makes the understanding pretty hard. Hopefully this last section with the Apache Kafka implementation example will shed some light on the last doubt zones!

Let's start by the last classes. First, the KafkaRecordEmitter. It receives a new record and emits it from the defined KafkaRecordDeserializationSchema:

// KafkaRecordEmitter
public void emitRecord(ConsumerRecord<byte[], byte[]> consumerRecord, SourceOutput<T> output, KafkaPartitionSplitState splitState) throws Exception {
try {
  sourceOutputWrapper.setSourceOutput(output);
  sourceOutputWrapper.setTimestamp(consumerRecord.timestamp());
  deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);
  splitState.setCurrentOffset(consumerRecord.offset() + 1);

As you can see in the snippet, the emitter also references a dedicated SplitState that implements a SourceSplit. The snippet shows that the state tracks the last processed offset for each partition. Besides, it can also return the last offset to process in case the reader is the bounded one.

One step earlier are the classes interacting with Apache Flink directly, the KafkaSourceReader, KafkaSourceFetcherManager, and finally the most important one, KafkaPartitionSplitReader. The last class is where the fetch() currently happens:

public RecordsWithSplitIds<ConsumerRecord> fetch() throws IOException {
    ConsumerRecords<byte[], byte[]> consumerRecords;
    try {
      consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));

An important thing to notice here is the plural version of the records. Even though Apache Flink processes one record at a time, the snippet shows the I/O optimization that greatly reduces the network calls by getting multiple records each time.

I must admit, the journey was not easy because of these many intermediary classes involved in the reading action. However, it was a necessary step for me before going further to learn about checkpoints and watermarks. But wait, I still have one topic to cover before, the data exchange between tasks! If I won't discover anything surprising, it'll be the blog post on Apache Flink.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!