Sessionization pipeline - from Kafka to Kinesis version
The post is composed of 2 parts. In the first one, I will focus on the Kinesis connector which was a pretty bad surprise for me. In the second part, I will cover the differences between Kafka and Kinesis sources.
Finding a connector
In DStream-based version, AWS Kinesis was the first class streaming data source, alongside Apache Kafka (Github's external submodule). That's why I was sure that adapting my code to the Kinesis stream will be just a matter of data source configuration. Unfortunately, I was wrong because the external submodule doesn't contain Kinesis anymore. Thus, you will need to use one of the available Open Source implementations or run your pipeline directly on the Databricks platform if you have a chance to use it. I explored the first path.
The connector that convinced me the most was the one maintained by Qubole. After my first experiences with DStream and Kinesis, I was very happy to see that one of the most problematic features which was the resharding (change of the number of shards in a stream, up or down) was implemented in the connector. Every kinesis.client.describeShardInterval, the driver checks the topology of the stream by issuing the DescribeStreams query to AWS API. Another nice feature is the possibility to send aggregated records in the sink. Records aggregation helps to reduce the size of sent messages, so also the risk of touching Kinesis limits.
However, among the features you won't find the possibility to read multiple streams as for Apache Kafka source where you can use data from multiple topics. You also are unable to retrieve the records from an arbitrary sequence number (Kinesis representation for Kafka's offset). So you will be able to read the oldest or the newest data.
Switching to Kinesis
To use Qubole's connector you have to download the project and build the dependency locally. If you use a repository manager, you can do it only once and let your colleagues download the dependency. But I agree that it's less convenient than finding the JAR on public repositories.
The code reading data from Kinesis was just a little bit different from the code using Kafka:
val dataFrame = session.readStream.format("kinesis") .option("streamName", inputStream) .option("endpointUrl", endpoint) .option("startingPosition", startingPosition) .option("kinesis.executor.maxFetchRecordsPerShard", "200") .option("kinesis.executor.maxRecordPerRead", "50") .load()
As you can see, the first difference is the "kinesis" format but it's quite clear. The last two options are much more important since they'll drive the latency of your processing. kinesis.executor.maxFetchRecordsPerShard says how many records will be processed on every shard whereas kinesis.executor.maxRecordPerRead defines the number of records retrieved at every API call.
Sounds similar? Let me explain. To read data from Kinesis you make API calls via GetRecordsRequest class and one of its attributes is called limit. It defines the number of records returned by Kinesis. However, this number can not be respected if the provisioned throughput on the shard is not enough to read them (2MB/second). The maxFetchRecordsPerShard attribute specifies the maximal size of the records returned in each query execution. It will be then a multiple of maxRecordPerRead, like in my example where I suppose to make 4 API calls to return 200 records per query execution.
Playing with Kinesis during my preparation to Spark+AI Summit was a great moment to recall some memories from DStream-based code. Even though Apache Spark doesn't integrate Kinesis as the first-class data source anymore, I was positively surprised by the features implemented in Qubole's Open Source version and thanks to that, the exercise of moving the project from Apache Kafka to AWS Kinesis was quite easy.
Read also about Sessionization pipeline - from Kafka to Kinesis version here: SPARK-18165 Kinesis support in Structured Streaming , SPARK-18020 Kinesis receiver does not snapshot when shard completes , Developing Consumers Using the Kinesis Data Streams API with the AWS SDK for Java , Kinesis connector .
If you liked it, you should read: Extending state store in Structured Streaming - reprocessing and limits Extending state store in Structured Streaming - reading and writing state Why UnsafeRow.copy() for state persistence in the state store? Extending state store in Structured Streaming - introduction Extending data reprocessing period for arbitrary stateful processing applications