Don't sleep when you code...about sleep issue in KPL

Lessons learned why it's always worth checking the code implementation to avoid surprises later. Even for vendor-supported solutions.

I'm one of these lazy developers who instead of reinventing the wheel for the pleasure of coding, writes only the minimum code and relies on established 3rd party solutions for the rest. I haven't had any issues with that for the last 14 years but there is always the first time for everything, punishment for the laziness included.

A little bit about the context. My first cloud journey was on AWS and I extensively explored the streaming area (Spark Streaming on EMR, Lambda, Kinesis Data Streams, Kinesis Firehose) before those of the batch (Redshift, S3, Batch, Spark SQL on EMR). Meantime I did other data projects on GCP and Azure, and when the opportunity to refresh my skills on AWS appeared, I didn't hesitate. Even though I knew the tools, I had a feeling that there would be something to discover, like Glue or MSK. Surprise, surprise, I was not wrong about that fact but rather about the services!

Hence, when I had to implement a streaming pipeline where each delivery batch should fully complete before sending the next one, I had 2 keywords in my head, Kinesis Producer Library (KPL) and synchronous flush. KPL is a great library simplifying a lot of cumbersome operations when dealing with Kinesis Data Streams. It offers a native buffering mechanism, multi-threading data delivery, and, most importantly, automatic retries with a pretty nice time-based configuration. Just take a look the beauty of that simplicity:

import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration}

new KinesisProducer(
  new KinesisProducerConfiguration()
    .setRecordMaxBufferedTime(TimeUnit.SECONDS.toMillis(5))
    .setMaxConnections(2)
    .setRequestTimeout(TimeUnit.SECONDS.toMillis(4))
    .setFailIfThrottled(false)
    .setThreadingModel("POOLED")
    .setThreadPoolSize(2)
    .setRecordTtl(TimeUnit.SECONDS.toMillis(30))
    .setRegion("eu-west-1")
)

To ensure the complete delivery, KPL also exposes a dedicated method, the flushSync(). I'm deliberately omitting the code details here to explain the reasons behind this blog post first.

The data producer using the KPL started to suffer from some performance issues and after testing each code block, it turned out the delivery part was the bottleneck. Weird since the CloudWatch metrics for PutRecords latency were ridiculously small (dozens of ms each minute) and they didn't have any load peaks. Despite that, delivering each batch via KPL took at least 500ms!

It's not weird if you take a look at implementation part:

public void flushSync() {
    while (getOutstandingRecordsCount() > 0) {
        flush();
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) { }
    }
}

In plain English, whenever we have any outstanding records to deliver in the buffer, we issue them to the Kinesis endpoint and we sleep for 500 ms - probably - hoping they're all delivered before the next getOutstandingRecordsCount() check. Therefore, whenever you have 1, 10 or 100 records to deliver, it'll always take approximately 500 ms.

Does it mean, I will start reinventing the wheel? Not at all but for sure, I'll pay closer attention to the synchronous code. The sleep synchronization is maybe not the last surprise I've faced.

TAGS: #streaming