I'm slowly going closer to the end of Spark+AI Summit follow-up posts series. But before I terminated, I owe you an explanation for how to run the pipeline from my Github on Kinesis.
What would it take for you to trust your Databricks pipelines in production?
A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.
Konieczny
The post is composed of 2 parts. In the first one, I will focus on the Kinesis connector which was a pretty bad surprise for me. In the second part, I will cover the differences between Kafka and Kinesis sources.
Finding a connector
In DStream-based version, AWS Kinesis was the first class streaming data source, alongside Apache Kafka (Github's external submodule). That's why I was sure that adapting my code to the Kinesis stream will be just a matter of data source configuration. Unfortunately, I was wrong because the external submodule doesn't contain Kinesis anymore. Thus, you will need to use one of the available Open Source implementations or run your pipeline directly on the Databricks platform if you have a chance to use it. I explored the first path.
The connector that convinced me the most was the one maintained by Qubole. After my first experiences with DStream and Kinesis, I was very happy to see that one of the most problematic features which was the resharding (change of the number of shards in a stream, up or down) was implemented in the connector. Every kinesis.client.describeShardInterval, the driver checks the topology of the stream by issuing the DescribeStreams query to AWS API. Another nice feature is the possibility to send aggregated records in the sink. Records aggregation helps to reduce the size of sent messages, so also the risk of touching Kinesis limits.
However, among the features you won't find the possibility to read multiple streams as for Apache Kafka source where you can use data from multiple topics. You also are unable to retrieve the records from an arbitrary sequence number (Kinesis representation for Kafka's offset). So you will be able to read the oldest or the newest data.
Switching to Kinesis
To use Qubole's connector you have to download the project and build the dependency locally. If you use a repository manager, you can do it only once and let your colleagues download the dependency. But I agree that it's less convenient than finding the JAR on public repositories.
The code reading data from Kinesis was just a little bit different from the code using Kafka:
val dataFrame = session.readStream.format("kinesis")
.option("streamName", inputStream)
.option("endpointUrl", endpoint)
.option("startingPosition", startingPosition)
.option("kinesis.executor.maxFetchRecordsPerShard", "200")
.option("kinesis.executor.maxRecordPerRead", "50")
.load()
As you can see, the first difference is the "kinesis" format but it's quite clear. The last two options are much more important since they'll drive the latency of your processing. kinesis.executor.maxFetchRecordsPerShard says how many records will be processed on every shard whereas kinesis.executor.maxRecordPerRead defines the number of records retrieved at every API call.
Sounds similar? Let me explain. To read data from Kinesis you make API calls via GetRecordsRequest class and one of its attributes is called limit. It defines the number of records returned by Kinesis. However, this number can not be respected if the provisioned throughput on the shard is not enough to read them (2MB/second). The maxFetchRecordsPerShard attribute specifies the maximal size of the records returned in each query execution. It will be then a multiple of maxRecordPerRead, like in my example where I suppose to make 4 API calls to return 200 records per query execution.
Playing with Kinesis during my preparation to Spark+AI Summit was a great moment to recall some memories from DStream-based code. Even though Apache Spark doesn't integrate Kinesis as the first-class data source anymore, I was positively surprised by the features implemented in Qubole's Open Source version and thanks to that, the exercise of moving the project from Apache Kafka to AWS Kinesis was quite easy.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems contact@waitingforcode.com đź“©
Read also about Sessionization pipeline - from Kafka to Kinesis version here:
- SPARK-18165 Kinesis support in Structured Streaming SPARK-18020 Kinesis receiver does not snapshot when shard completes Developing Consumers Using the Kinesis Data Streams API with the AWS SDK for Java Kinesis connector
Related blog posts:
- Extending state store in Structured Streaming - reprocessing and limits
- Extending state store in Structured Streaming - reading and writing state
- Why UnsafeRow.copy() for state persistence in the state store?
- Extending state store in Structured Streaming - introduction
- Extending data reprocessing period for arbitrary stateful processing applications
The demo code of my #ApacheSpark #StructuredStreaming sessionization pipeline was written for Apache Kafka. I did an exercise and adapted it to #AWS #Kinesis thanks to @Qubole connector ? https://t.co/QNMs7Ezh6y
— Bartosz Konieczny (@waitingforcode) November 23, 2019
