Streams, streams, streams everywhere - JVM side this time

on waitingforcode.com

Streams, streams, streams everywhere - JVM side this time

You're still doing Java/C#/JavaScript/Python/PHP... and need a wind of change? I was like that 4 years ago. I changed then to the data engineering field and it solved my existential problems :) If you want to follow my path, I prepared a course that will help you with that! Join the class!
Maybe you didn't like my clickbait title but wanted to test my creativity with it ;) And more seriously, in this post I will cover streams but the ones that you're using in your Scala/Java code rather than the distributed ones provided by Apache Kafka. And I decided to write that because by analyzing a lot of Spark I/O method, I meet streams everywhere and I wanted to shed some light on them.

In this post I will try to bring some light on streams. In the first part, I will present the basic types of streams in the JDK. In the next part, I will show some more specialized implementations whereas in the last one I will analyze how the streams are used by AWS S3 client to send the data to your bucket.

Some basics

In simple terms, a stream is a data flow either from a source or into a destination. Depending on the implementation, that flow can be represented as an array of bytes or objects and can be stored on the memory, in a file or socket. That's the simplest definition and it should be sufficient to understand the rest of the article.

And that "rest" starts with different types of streams. I will discuss here mostly the types provided with JDK 8. And so, we've a byte array-based stream which can be either used as the source (ByteArrayInputStream) or the destination (ByteArrayOutputStream). Their use is quite straightforward so I will move directly to the part about the wrappers. Because yes, some streams can wrap other ones, mostly the streams with very poor API, like already quoted ByteArray...Stream. And one of the wrappers is a BufferedOutputStream/BufferedInputStream that will use an intermediate buffer between your application and the OS (otherwise, every write can cause a call to the underlying OS!):

  behavior of "byte array with buffered streams"

  it should "read stream from a string" in {
    val byteArrayInput = new ByteArrayInputStream("abcde".getBytes("utf-8"))
    val bufferSizeBytes = 2
    // buffer size is flexible, ie. it grows with new data
    val input = new BufferedInputStream(byteArrayInput, bufferSizeBytes)

    input.read().toChar shouldEqual 'a'
    input.read().toChar shouldEqual 'b'
    input.read().toChar shouldEqual 'c'
    input.read().toChar shouldEqual 'd'
    input.read().toChar shouldEqual 'e'
    input.read() shouldEqual -1
  }

  it should "write stream to an array" in {
    val arraySizeBytes = 2
    val byteArrayOutput = new ByteArrayOutputStream(arraySizeBytes)
    val output = new BufferedOutputStream(byteArrayOutput, arraySizeBytes)

    output.write(1)
    byteArrayOutput.toByteArray shouldBe empty
    output.write(2)
    byteArrayOutput.toByteArray shouldBe empty
    output.write(3)
    byteArrayOutput.toByteArray.mkString(", ") shouldEqual "1, 2"
    // Now I'm flushing the small buffer jst to materialize the buffered data
    output.flush()
    byteArrayOutput.toByteArray.mkString(", ") shouldEqual "1, 2, 3"
    output.write(4)
    byteArrayOutput.toByteArray.mkString(", ") shouldEqual "1, 2, 3"
    output.write(5)
    byteArrayOutput.toByteArray.mkString(", ") shouldEqual "1, 2, 3"
    output.write(6)
    // As you can see, after the flush, the buffer is reset
    byteArrayOutput.toByteArray.mkString(", ") shouldEqual "1, 2, 3, 4, 5"
    output.write(7)
    byteArrayOutput.toByteArray.mkString(", ") shouldEqual "1, 2, 3, 4, 5"
    output.close()
    byteArrayOutput.toByteArray.mkString(", ") shouldEqual "1, 2, 3, 4, 5, 6, 7"
  }

As you can see here, the output is written by chunks defined with the buffer size. The same technique works for file-based streams where FileOutputStream is one of them:

  "buffered output stream" should "write file by chunks" in {
    val filePath = "/tmp/test_file_buffer.txt"
    FileUtils.forceDelete(new File(filePath))
    def readFileContent = FileUtils.readFileToString(new File(filePath))
    val fileOutput = new FileOutputStream(filePath)
    val bytesSize = 2
    val bufferedOutput = new BufferedOutputStream(fileOutput, bytesSize)

    bufferedOutput.write('a'.toByte)
    bufferedOutput.write('b'.toByte)
    readFileContent shouldBe empty
    bufferedOutput.flush()
    readFileContent shouldEqual "ab"

    // It worked with the explicit flush but here you can see that
    // it also works for the full buffer
    bufferedOutput.write('c'.toByte)
    readFileContent shouldEqual "ab"
    bufferedOutput.write('d'.toByte)
    readFileContent shouldEqual "ab"
    // Add new element to already full buffer
    bufferedOutput.write('e'.toByte)
    readFileContent shouldEqual "abcd"
    bufferedOutput.close()
    readFileContent shouldEqual "abcde"
  }

As you can see, our file is eventually consistent because of the buffer. In other words, the reads at different moments of its generation can return different results, depending on the buffer flush. The 3rd type I'd like to share with you in this post is DataOutputStream. If you check again our previous examples, you can see that the write is quite primitive since it takes an integer as a parameter. With data streams, you can use other primitive types:

  "data output stream" should "write string bytes" in {
    val arraySizeBytes = 2
    val byteArrayOutput = new ByteArrayOutputStream(arraySizeBytes)
    val dataOutputStream = new DataOutputStream(byteArrayOutput)

    dataOutputStream.writeBytes("a")
    dataOutputStream.writeBytes("b")
    dataOutputStream.writeBytes("c")
    dataOutputStream.close()

    byteArrayOutput.toByteArray.map(letterByte => letterByte.toChar).mkString(", ") shouldEqual "a, b, c"
  }

Of course, that's a very simplified vision of streams. Streams in Java are a wide topic that is hard to cover in a single blog post. Under the article, I'll give you some references that should help you to learn more.

As I said, my principal concern was the question "does my stream buffer everything until the write?". The answer is hidden in the previous part where you can learn about the streams buffering some data before materializing it on the underlying stream like file or memory.

Specialized streams

In the previous part I mostly covered the streams available in Java API. Let's see now how we can do more with more specialized versions of input and output streams:

  • process multiple byte buffer chunks instead of one big bytes array - eg. org.apache.spark.util.io.ChunkedByteBufferInputStream
  • read always a specific number of bytes at most - eg. com.google.common.io.ByteStreams.LimitedInputStream
  • concurrently read data from a byte buffer - org.apache.spark.io.ReadAheadInputStream uses 2 buffers where one is filled up asynchronously whereas the other one is exposed to the client. The exposed stream contains only the data that should be returned for given read() call.
  • log everything happens inside the stream - you can check that in org.apache.http.impl.conn.LoggingOutputStream
  • writes a specific number of bytes per a temporary unit - it's the case of Spark's org.apache.spark.streaming.util.RateLimitedOutputStream used mainly in tests of previous DStream-based streaming module
  • wraps the stream to be compressed - it's quite common use case in data projects, you can find it in Kafka (eg. KafkaLZ4BlockOutputStream), Hadoop (eg. GzipOutputStream) or Spark (eg. SnappyOutputStreamWrapper)
  • ignore all bytes, exactly like /dev/null destination in Unix systems - an example is org.apache.commons.io.output.NullOutputStream
  • retrieve only a subset of bytes - in other words, we can make a seek inside given input stream and start to read it from an arbitrary point, eg: org.apache.hadoop.fs.FSDataInputStream

There are only a few examples of different versions of input and output streams you may encounter in Open Source libraries. Let's analyze now one of possible stream use cases.

S3 PUT object request

To get a better understanding about streams in real life, we'll analyze AmazonS3Client's putObject method. It takes an instance of PutObjectRequest as the parameter and that object brings the data of the written S3 object. That data is represented as InputStream:

InputStream input = putObjectRequest.getInputStream();

After that, a few interesting things happen. If the input stream is file-based, the client uses it to set the content length and build MD5 checksum which will be later used to ensure a successful data delivery. After that, the initial input stream, is wrapped around a ResettableInputStream. This wrapper ensures that the stream can be read multiple times from any arbitrary point. We say that the stream is a mark-and-ressetable stream because its mark(int) method creates a kind of checkpoint in the stream being the starting point for all reads made after reset() call. To put it even simpler, we can mark a position in the stream and later do as many read-reset calls for different clients as we want, and we will always start from the last marked position.

On the other hand, if the input is not file-based, and it doesn't support reset and mark methods like RessetableInputFStream does, the client wraps the original stream with ReleasableInputStream which can fall down to the RessetableInputStream if the input stream associated with the put object request is an instance of FileInputStream. But let's suppose that it's not the case. For such a scenario, the client wraps the original stream with ReleasableInputStream which can have the close() method disabled. By the way, RessetableInputStream is a child of ReleasableInputStream so it can also have the close method disabled.

Later, if the client knows the length of the input to transfer, it initializes an instance of LengthCheckInputStream which becomes the input stream that will be loaded to S3. The specificity of this implementation is the length check made after the last processed byte:

     */
    private void checkLength(boolean eof) {
        if (eof) {
            if (dataLength != expectedLength) {
                throw new SdkClientException(
                        "Data read has a different length than the expected: "
                                + diagnosticInfo());
            }
        } else if (dataLength > expectedLength) {
            throw new SdkClientException("More data read than expected: "
                    + diagnosticInfo());
        }
    }

Finally, the last place manipulating the input stream is about MD5 checksum. If the data source is a file, the following step shouldn't happen because the client already knows the checksum. Otherwise, namely when the MD5 is unknown and the upload validation is enabled (= com.amazonaws.services.s3.disablePutObjectMD5Validation set to false), the client will use a class called MD5DigestCalculatingInputStream to generate the checksum for the input data in the fly:

    /** The MD5 message digest being calculated by this input stream */
    private MessageDigest digest;


    public int read() throws IOException {
        int ch = in.read();
        if (ch != -1) {
            digest.update((byte)ch);
        }
        return ch;
    }

At the end, the client will use the checksum returned by the server and the one it computed from the input stream ( in order to ensure that all bytes were transferred correctly:

        String contentMd5 = metadata.getContentMD5();
        if (md5DigestStream != null) {
            contentMd5 = Base64.encodeAsString(md5DigestStream.getMd5Digest());
        }

        final String etag = returnedMetadata.getETag();
        if (contentMd5 != null && !skipMd5CheckStrategy.skipClientSideValidationPerPutResponse(returnedMetadata)) {
            byte[] clientSideHash = BinaryUtils.fromBase64(contentMd5);
            byte[] serverSideHash = BinaryUtils.fromHex(etag);

            if (!Arrays.equals(clientSideHash, serverSideHash)) {
                publishProgress(listener, ProgressEventType.TRANSFER_FAILED_EVENT);
                throw new SdkClientException(
                    "Unable to verify integrity of data upload. Client calculated content hash (contentMD5: " + contentMd5
                    + " in base 64) didn't match hash (etag: " + etag + " in hex) calculated by Amazon S3.  "
                    + "You may need to delete the data stored in Amazon S3. (metadata.contentMD5: " + metadata.getContentMD5()
                    + ", md5DigestStream: " + md5DigestStream
                    + uploadStrategy.md5ValidationErrorSuffix()
                    + ")");
            }
        }

A thing that was confusing to me was about the content length. At first glance, I confused it with MD5 checksum and wondered why that if a branch exists:

            if (contentLength == null) {
                /*
                 * There's nothing we can do except for let the HTTP client buffer
                 * the input stream contents if the caller doesn't tell us how much
                 * data to expect in a stream since we have to explicitly tell
                 * Amazon S3 how much we're sending before we start sending any of
                 * it.
                 */
                log.warn("No content length specified for stream data.  " +
                         "Stream contents will be buffered in memory and could result in " +
                         "out of memory errors.");
            }

But in fact, the content length is not the checksum. The content length indicates the size of the payload whereas the checksum is the value returned by the server and compared against the same client's attribute to ensure data integrity. The checksum can be then computer after the upload but not the content-length which is the attribute of the request sent to the S3 endpoint. Therefore, it must be known before. And you can read that in the HTTP specification:


The presence of a message-body in a request is signaled by the inclusion of a Content-Length or Transfer-Encoding header field in the request's message-headers. A message-body MUST NOT be included in a request if the specification of the request method (section 5.1.1) does not allow sending an entity-body in requests. A server SHOULD read and forward a message-body on any request; if the request method does not include defined semantics for an entity-body, then the message-body SHOULD be ignored when handling the request.

The client will, therefore, buffer the input stream before sending it to S3 just to know its length. And as stated in the warning, it can lead to OOM problems.

As you can see, streams, both for input and output, have a lot of different implementations. Some of them are quite easy since backed by a simple array, but there are some others that are more specific, like the ones working on 2 buffers at the same time or allowing reading the same bytes after resetting the position. I know that I didn't cover everything in this post. I hope that it gives you a small overview though and encourages you to see much more about that topic :)

Read also about Streams, streams, streams everywhere - JVM side this time here: Streams in Java - concepts and usage , .

Share on:

Share, like or comment this post on Twitter:

Share, like or comment this post on Facebook: