Handling multiple I/O from one thread with NIO Selector

Versions: Java 8 https://gist.github.com/bartosz25/7327f36f256556f54a43e5e67317f6a4

That's the next post I wrote after my unsuccessful analysis of Apache Kafka source. When I was reading the part responsible for sending requests to the broker, I found that it was partially managed by a Java package that I've never seen before. And that will be the topic of this post.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

The post starts by the part explaining some major features and components of Selector. Later we'll see how is performed the selection process. After that, we'll discover how to make the writes. After that, I will shortly present ByteBuffer operations used with selectors and finally, in the last part, a sample client-server code.

Selector

To describe Selector shortly, we can consider it as a class able to manage multiple connections in a single thread. In Javadoc, you can find an even better definition saying that Selector is "a multiplexor of SelectableChannel objects." If it's still mysterious, don't worry, we'll go through some details just now.

There are 3 classes to know, Selector, SelectableChannel and SelectionKey. To setup a simple code we need to create an instance of Selector, initialize channels that will register on the selector instance and return SelectionKey with appropriate registered actions. A simple implementation looks like this:

 
val clientSelector = Selector.open()
val channelSend = SocketChannel.open()
channelSend.configureBlocking(false)
val connectKey = channelSend.register(clientSelector, SelectionKey.OP_CONNECT)
channelSend.connect(serverSocketAddress)

In the above snippet you can see some key elements of the Selector. The first of them is Selector.open(). This factory method uses the OS default selector provider to return the Selector object used later in our code. Selector itself is also an abstract class and its implementations depend on the OS. For example, for Linux it can be an instance of sun.nio.ch.EPollSelectorImpl and for Windows sun.nio.ch.WindowsSelectorImpl.

In the next line I open a connection to the socket channel. The channel you use here must be an instance of SelectableChannel abstract class and SocketChannel is one of these implementations. A SelectableChannel is a channel able to register to a Selector and to listen for some specific operations, like you can see in SelectionKey.OP_CONNECT. There are other keys, OP_ACCEPT, OP_READ and OP_WRITE but in my case I need to start by handling the connection. If you're wondering, the meaning of each operation is straightforward. OP_READ stands for channel reading data from the socket, OP_WRITE for writing data to it, OP_ACCEPT for a server accepting new connection from the client and OP_CONNECT for a client attempting to connect to the server.

Another important point about that channel is configureBlocking(false) call. For SelectableChannels working with Selectors the non-blocking mode is required since it will not block the I/O operation. In other words, for a blocking connection, the channel will block until a new connection is available on an I/O error occurs. In a non-blocking, the channel will return directly, without waiting for one of these events. In consequence, the non-blocking character can result in fewer bytes transferred than requested.

Selection process

If you're wondering why Selector is called so, well, because it's responsible for making selection. The selection consists of querying the OS for the channels ready for any subscribed I/O operations (read, write, ...). Selector performs that action through one of selectNow(), select(timeout) or select() methods. The first one is non blocking and it returns as soon as possible - even if there are no ready channels. The second one blocks during the time specified in the timeout parameter whereas the last one blocks until at least one channel is ready for I/O operations.

The used select method returns the number of keys ready for I/O operations. They're all the instances of SelectionKey class. SelectionKey is created when the channel registers to the Selector. The key stores 2 different sets with operations (read, write, connect, accept). The first one is called interest set and stores all operations whose readiness will be tested at every invocation of the selection method. The other set is ready set and it stores all operations ready to be performed by the channel. Both sets can be retried with appropriate methods, interestOps() for the former and readyOps() for the latter.

As I said before, the selection method returns the number of selected keys. To retrieve the keys themselves, we must use selectedKeys() method. It's important to know that selected keys != registered keys. And since SelectableChannel's register method returns a SelectionKey, the registered keys will reflect all registered channels. Let's see now how to manipulate the keys after registering in the example of a client socket client making the connection to a server:

while (true) {
  clientSelector.selectNow()
  val selectedKeys = clientSelector.selectedKeys().iterator()
  while (selectedKeys.hasNext) {
    val key = selectedKeys.next()
    if (key.isConnectable) {
      val channel = key.channel().asInstanceOf[SocketChannel]          
      val isConnected = channel.finishConnect()
      assert(isConnected, "Client should correctly handle connect")
      val clientId = key.attachment().toString
      println(s"${clientId} was correctly connected to the server") 
      key.interestOps(SelectionKey.OP_READ)
    }
    selectedKeys.remove()
  }
}

An important thing to notice here is that I remove the selected key after each iteration. Why is that? It's because the selector doesn't do that. It simply adds new keys and it would lead to reprocessing already processed events. Another new concept is the attachment. When you create a channel you can attach an arbitrary object to it. In my case it's an id of the client, defined on the client-side with attach(Object) method.

Another question is why I started with OP_CONNECT? Simply because we need to connect to the server before making any read :) In other words, there is nothing to read if we are not connected. In the key handling connection, I also check if the connection is fully established by asserting on finishConnect result. I prefer to fail-fast here just to keep the code simple. Otherwise, I could allow some extra period to complete, for instance with a while loop and some number of retries.

At the end of the above snippet I change the interest operators of the client to read. Why only read? Let's see that in the next section.

Write strategies

The reason why I didn't put OP_WRITE operation in the previous snippet is the advised write strategy. To explain the "why" I will use the explanation provided by user207421 on StackOverflow (links in Read more section). In fact, and as for my understanding, you can write to the socket whenever you want. So, why OP_WRITE does ever exist? Unfortunately, the writes in non-blocking channels can be a little bit tricky. When you write some bytes, you'll use write() method that will return the number of bytes written to the socket. If this number is 0 or lower than the total number of bytes in the input, it means that something wrong happened and at this moment you should subscribe to OP_WRITE operation to complete the writing as soon as possible.

The exact answer of user207421 is the following:


"I mean - in the above example, a socket is registered for write every time after a message is read." It's a very poor example. Incorrect technique. You should just write. If write() returns zero, then you should (a) register the channel for OP_WRITE; (b) retry the write when the channel becomes writable; and (c) deregister the channel for OP_WRITE once the write has completed.

Another writing strategy, highlighting the quoted steps, consists of using a queue to keep in-flight messages that have to be delivered to the socket. Once we detect an in-flight message to send, we register the interest in OP_WRITE and handle the case in the selection process. It sounds a little bit contradictory but currently, that's how Apache Kafka deals with selectors. Once a request is ready to be sent, the channel sets the interest on OP_WRITE and takes the message during the selection process. I won't cover more details here but you should find them in my article about Kafka and selectors.

ByteBuffer and flip

Channels will use a ByteBuffer to read and write data from the sockets. And since the Selector is single-threaded, you can use a single buffer for all channels - there won't be concurrency issues. On the other hand, you must be aware of required 2 operations with ByteBuffer. A common point of them is that's all about the position, limit and capacity, 3 properties of very buffer. The position returns the position of the buffer. For instance, if you write 3 bytes, the position will be 3:

val byteBuffer = ByteBuffer.allocate(10)
byteBuffer.put("abc".getBytes())
assert(byteBuffer.position == 3)

From the previous snippet you can also deduce the capacity. The capacity is the total size of the buffer, defined here as the parameter of allocate method. The last property is the buffer's limit. During the reading operation, this attribute says how many bytes can be retrieved from the buffer.

So let's back now to our 2 key methods. The first of them is clear() and it simply resets everything, so sets the position to 0 and the limit to the capacity. The second method is called flip() and it sets the limit to the current position and the position to 0.

Why flip() call is then required? Thanks to it the consumer, like channel, will read/write only the existent bytes from the buffer:

val byteBuffer = ByteBuffer.allocate(10)
byteBuffer.put("abc".getBytes())
var readBytes = 0
byteBuffer.position(0) // set position to 0 - after writing "abc", the position will be 3
while (byteBuffer.hasRemaining) {
  byteBuffer.get()
  readBytes += 1
}
assert(readBytes == 10)

// Check now flipped buffer
val byteBufferForFlip = ByteBuffer.allocate(10)
byteBufferForFlip.put("abc".getBytes())
byteBufferForFlip.flip()
readBytes = 0
while (byteBufferForFlip.hasRemaining) {
  byteBufferForFlip.get()
  readBytes += 1
}
assert(readBytes == 3)

Ping-pong client-server example

Let's terminate this pretty long blog post by showing the code of my ping-pong client-server code. I will start by showing you the code responsible for sending or reading the data that I wrapped around 2 functions:

object ByteBufferIOOps {
  def writeToChannel(channel: SocketChannel, textToWrite: String, buffer: ByteBuffer) = {
    buffer.clear()
    buffer.put(textToWrite.getBytes())
    buffer.flip()
    while (buffer.hasRemaining) {
      val bytesWritten = channel.write(buffer)
      assert(bytesWritten > 0)
    }
    buffer.clear()
  }
  def read(channel: SocketChannel, buffer: ByteBuffer): String = {
    buffer.clear()
    val readBytes = channel.read(buffer)
    assert(readBytes > -1, "Channel has been closed") // fail-fast just for easier testing
    buffer.flip()
    val text = new String(buffer.array(), 0, readBytes, StandardCharsets.UTF_8)
    buffer.clear()
    text
  }
}

Let's move now to the server part:

    val serverAddress = new InetSocketAddress("localhost", 4444)
    val serverSelector = Selector.open()
    val server = ServerSocketChannel.open()
    server.bind(serverAddress)
    server.configureBlocking(false)
    server.register(serverSelector, SelectionKey.OP_ACCEPT)

    new Thread(new SocketClient(serverAddress)).start()

    // we can use a single buffer because we've a single thread here
    val buffer = ByteBuffer.allocate(256)
    def logServer(textToLog: String) = println(Console.BLUE + s"Server: ${textToLog}" + Console.RESET)
    while (true) {
      serverSelector.selectNow()
      val selectedKeys = serverSelector.selectedKeys().iterator()
      while (selectedKeys.hasNext) {
        val key = selectedKeys.next
        if (key.isAcceptable) {
          val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
          val socketChannel = serverSocketChannel.accept()
          logServer(s"accepted connection from ${socketChannel.getRemoteAddress}")
          socketChannel.configureBlocking(false)
          socketChannel.register(serverSelector, SelectionKey.OP_WRITE)

          // If I don't write, the keys from the client are always key 1 vs 8 (so READ is not ready!)
          val textToWrite = "Hello world"
          logServer(s"Writing '${textToWrite}'")
          ByteBufferIOOps.writeToChannel(socketChannel, textToWrite, buffer)
        } else if (key.isReadable) {
          val channel = key.channel().asInstanceOf[SocketChannel]
          val inputText = ByteBufferIOOps.read(channel, buffer)
          logServer(s"Reading '${inputText}'")
          key.interestOps(SelectionKey.OP_WRITE)
        } else if (key.isWritable) {
          val socketChannel = key.channel().asInstanceOf[SocketChannel]
          val textToWrite = "Hello from server"
          logServer(s"Writing '${textToWrite}'")
          ByteBufferIOOps.writeToChannel(socketChannel, textToWrite, buffer)
          key.interestOps(SelectionKey.OP_READ)
        }
        selectedKeys.remove()
      }
      Thread.sleep(1000L)
    }

Pretty the same thing like in previous sections. An interesting point to notice here is the fact of using a single buffer for all I/O operations. I found 2 reasons for that. First, the code is not multi-threaded, so using 1 buffer is fine. Another reason concerns the question about 1 global buffer vs 1 buffer per I/O operation. During my research I found that creating a new ByteBuffer for every operation would be wasteful. And that's true because every buffer would be built on its own new array whereas with a single buffer, we're manipulating only one array. But it has few drawbacks too. If your buffer is too small, you will need to deal with buffer overflow illustrated by a BufferOverflowException. For my sample it's enough and that's why do not consider it as a prod-ready code.

Regarding the client part, is quite similar to the server's one:

    val clientSelector = Selector.open()
    // could also work with a queue like suggested here https://stackoverflow.com/questions/14249353/nio-blocking-write-not-working
    (0 to 2).foreach(id => {
      val channelSend = SocketChannel.open()
      channelSend.configureBlocking(false)
      val connectKey = channelSend.register(clientSelector, SelectionKey.OP_CONNECT)
      channelSend.connect(serverSocketAddress)
      connectKey.attach(s"client#${id}") // Kafka uses attach(...) to set a KafkaChannel to the key
      // Kafka's mute/unmute consists on adding/removing SelectionKey.OP_READ!
    })
    val buffer = ByteBuffer.allocate(256)
    while (true) {
      clientSelector.selectNow()
      val selectedKeys = clientSelector.selectedKeys().iterator()
      while (selectedKeys.hasNext) {
        val key = selectedKeys.next()
        if (key.isConnectable) {
          val channel = key.channel().asInstanceOf[SocketChannel]
          val isConnected = channel.finishConnect()
          assert(isConnected, "Client should correctly handle connect")
          val clientId = key.attachment().toString
          println(s"${clientId} was correctly connected to the server")
          // subscribe to reads - other way to do, if you want to keep already existent
          // operations, is to call (key.interestOps(key.interestOps() | SelectionKey.OP_READ))
          key.interestOps(SelectionKey.OP_READ)
        } else if (key.isWritable) {
          val channel = key.channel().asInstanceOf[SocketChannel]
          val clientId = key.attachment().toString
          val textToWrite = s"Hello from the ${clientId}"
          logClient(s"${clientId} Writing ${textToWrite}")
          ByteBufferIOOps.writeToChannel(channel, textToWrite, buffer)
          key.interestOps(SelectionKey.OP_READ)
        } else if (key.isReadable) {
          val channel = key.channel().asInstanceOf[SocketChannel]
          val inputText = ByteBufferIOOps.read(channel, buffer)
          logClient(s"${key.attachment().toString} Got message from the server: '${inputText}'")
          key.interestOps(SelectionKey.OP_WRITE)
        }
        selectedKeys.remove()
      }
    }

I prefered to keep the code simple and write data directly. However, as stated in the inline comment, we can also use a queue to store in-flight messages and turn on the OP_WRITE mode only when the queue is not empty. But since I'm doing here a direct ping-pong server-client interaction, it's much easier to implement with the immediate switch between OP_READ and OP_WRITE operations for every channel.

And here we are. The post was finally quite long, so I will try to sum-up everything in few words before you leave. NIO Selector is a way to interact with I/O events like reading or writing data in non blocking manner, in a single-threading environment. It means that the code responsible for reading or writing will be invoked only when there is some read or write work to do. Selector is the key element allow to select events identified by SelectionKeys. Everything is coupled to the channels that let you communicate in non blocking way, for instance, through sockets.