Sessionization pipeline - from Kafka to Kinesis version

Versions: Apache Spark 2.4.2

I'm slowly going closer to the end of Spark+AI Summit follow-up posts series. But before I terminated, I owe you an explanation for how to run the pipeline from my Github on Kinesis.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

The post is composed of 2 parts. In the first one, I will focus on the Kinesis connector which was a pretty bad surprise for me. In the second part, I will cover the differences between Kafka and Kinesis sources.

Finding a connector

In DStream-based version, AWS Kinesis was the first class streaming data source, alongside Apache Kafka (Github's external submodule). That's why I was sure that adapting my code to the Kinesis stream will be just a matter of data source configuration. Unfortunately, I was wrong because the external submodule doesn't contain Kinesis anymore. Thus, you will need to use one of the available Open Source implementations or run your pipeline directly on the Databricks platform if you have a chance to use it. I explored the first path.

The connector that convinced me the most was the one maintained by Qubole. After my first experiences with DStream and Kinesis, I was very happy to see that one of the most problematic features which was the resharding (change of the number of shards in a stream, up or down) was implemented in the connector. Every kinesis.client.describeShardInterval, the driver checks the topology of the stream by issuing the DescribeStreams query to AWS API. Another nice feature is the possibility to send aggregated records in the sink. Records aggregation helps to reduce the size of sent messages, so also the risk of touching Kinesis limits.

However, among the features you won't find the possibility to read multiple streams as for Apache Kafka source where you can use data from multiple topics. You also are unable to retrieve the records from an arbitrary sequence number (Kinesis representation for Kafka's offset). So you will be able to read the oldest or the newest data.

Switching to Kinesis

To use Qubole's connector you have to download the project and build the dependency locally. If you use a repository manager, you can do it only once and let your colleagues download the dependency. But I agree that it's less convenient than finding the JAR on public repositories.

The code reading data from Kinesis was just a little bit different from the code using Kafka:

val dataFrame = session.readStream.format("kinesis")
  .option("streamName", inputStream)
  .option("endpointUrl", endpoint)
  .option("startingPosition", startingPosition)
  .option("kinesis.executor.maxFetchRecordsPerShard", "200")
  .option("kinesis.executor.maxRecordPerRead", "50")

As you can see, the first difference is the "kinesis" format but it's quite clear. The last two options are much more important since they'll drive the latency of your processing. kinesis.executor.maxFetchRecordsPerShard says how many records will be processed on every shard whereas kinesis.executor.maxRecordPerRead defines the number of records retrieved at every API call.

Sounds similar? Let me explain. To read data from Kinesis you make API calls via GetRecordsRequest class and one of its attributes is called limit. It defines the number of records returned by Kinesis. However, this number can not be respected if the provisioned throughput on the shard is not enough to read them (2MB/second). The maxFetchRecordsPerShard attribute specifies the maximal size of the records returned in each query execution. It will be then a multiple of maxRecordPerRead, like in my example where I suppose to make 4 API calls to return 200 records per query execution.

Playing with Kinesis during my preparation to Spark+AI Summit was a great moment to recall some memories from DStream-based code. Even though Apache Spark doesn't integrate Kinesis as the first-class data source anymore, I was positively surprised by the features implemented in Qubole's Open Source version and thanks to that, the exercise of moving the project from Apache Kafka to AWS Kinesis was quite easy.