It's rare when in order to write a blog post I need to cover more than 3 other topics. But that's what happens with Apache Kafka idempotent producer that I will publish soon. But before that, I need to understand and explain NIO Selector, its role in Apache Kafka, and finally the in flight requests. Since the first topic was already covered, I will move to the second one.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
To understand the NIO Selector integration in Apache Kafka, I will start by introducing Kafka's version of Java's Selector. Later, I will focus on the channel state and SocketChannel role in network communication. To easily understand this post it's important to know Java's NIO Selector. You can become familiar with it by reading my Handling multiple I/O from one thread with NIO Selector post.
Kafka and Selector
The key element of the NIO selection is a selector. It's responsible for handling I/O requests and it has the same role in Apache Kafka. However, it's not exposed directly to the clients. Instead, it's wrapped in org.apache.kafka.common.network.Selector which creates NIO Selector with the usual open() method, this.nioSelector = java.nio.channels.Selector.open();.
The NIO Selector is responsible for 2 things. The first one is the connection management. Selector usually registers the operation keys of interests to SocketChannel. At that specific moment an instance of KafkaChannel is created and associated to the manipulated SocketChannel:
protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException { SelectionKey key = socketChannel.register(nioSelector, interestedOps); KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key); this.channels.put(id, channel); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), time.nanoseconds()); return key; }
KafkaChannel is the connection abstraction between the client and the server. And it's also quite important since it's the object communicating directly with TransportLayer instance that I will cover further in this post.
The second purpose of NIO Selector is the selection. It happens through Kafka's Selector select(long timeoutMs) method. If you see the timeout parameter and you read my post about Java Selector, you certainly know that under-the-hood, the selection will be non-blocking (delegation to NIO's selectNow() method) or blocking (delegation to NIO's select(timeout)).
Kafka's Selector iterates over the selected keys in pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) function. Inside the loop, Kafka retrieves first the KafkaChannel associated to the SocketChannel introduced before. Later, depending on the channel's state (fully connected, not fully connected) and the SelectionKey, Kafka may start by ensuring that the connection is ready to be processed.
After ensuring the connection availability, Kafka will attempt to read the data from the channel if the SelectionKey is readable or if the channel has still some bytes to read from the previous call. The following schema shows the call flow for reading:
After attempting to read data, Kafka checks whether the SelectionKey is writable and it does not require reauthentication. If yes, it will proceed to send data. Writes flow looks like that:
Muted and unmuted channel
So far we saw similar things than in my previous post about NIO Selector in Java. But during the process covered in the previous part has also some Kafka-specific behaviors. And ones of them are muted and unmuted KafkaChannels. A muted channel means that a given connection is not read anymore. When it can happen? Muting a channel happens when the bytes are read through the network and there is not enough memory to handle them. The amount of available memory is controlled by one of the MemoryPool implementations. This feature was added in KIP-72 to avoid OOM problems with a sudden spike of large message batches and it applies only on the broker's level.
Muting a channel consists on removing the OP_READ key from the watched keys. Unmuting a channel does the opposite, ie. it adds the OP_READ to the listened keys.
SocketChannel
Another class that you certainly remember from my previous post about NIO Selector in Java is SocketChannel. Just to recall, this object is a channel connected to a TCP network socket, so globally where your I/O operations will pass through. The whole flow of initializing a SocketChannel in Kafka is not very different from the usual NIO Selector flow and you can see this on the following image:
As you can see, we retrieve here the methods like SocketChannel.open, attach or yet register. To understand how this SocketChannel is used in the network communication, we must first introduce another object, a TransportLayer. TransportLayer is one of the fields of the KafkaChannel attached to the SelectionKey, and it's created that way:
// SSL version has some extra SSL configuration // but key and socketChannel are still stored as internal fields public PlaintextTransportLayer(SelectionKey key) throws IOException { this.key = key; this.socketChannel = (SocketChannel) key.channel(); }
Later, when KafkaChannel is asked to perform some I/O action, it delegates its real execution to its TransportLayer:
public boolean finishConnect() throws IOException { //we need to grab remoteAddr before finishConnect() is called otherwise //it becomes inaccessible if the connection was refused. SocketChannel socketChannel = transportLayer.socketChannel(); if (socketChannel != null) { remoteAddress = socketChannel.getRemoteAddress(); } boolean connected = transportLayer.finishConnect(); // ... } public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } receive(receive); // ... } private long receive(NetworkReceive receive) throws IOException { return receive.readFrom(transportLayer); } private boolean send(Send send) throws IOException { midWrite = true; send.writeTo(transportLayer); if (send.completed()) { midWrite = false; transportLayer.removeInterestOps(SelectionKey.OP_WRITE); } return send.completed(); }
And here we are. As you can see, Apache Kafka uses Java's NIO Selector to read, write and manage connections. It does it with the 3 main classes: Selector, SelectionKey, and SocketChannel, which I introduced in my previous post. But that's just the beginning. In the next post, I will show how the selector integrates with in-flight requests.