If you already worked on AWS and tried to implement streaming applications, you certainly noticed one thing. There is no single way to do it! And if you didn't notice that, I hope that this blog post will convince you, and by the way, help you to get a better understanding of the available solutions.
The blog post is organized in 5 parts. Each section presents one serverless streaming solution and you will find here Lambda function, Kinesis Data Analytics (Flink + SQL), Kinesis Firehose and Glue. I omitted the parts requiring a bit more coding and ops effort like Apache Flink and Apache Spark on EMR, and KCL-based consumers running on EC2 or as containers. Also, to keep some consistency and facilitate the comparison, every part tries to answer these 4 questions:
- What are the use cases? - to keep things simple, in the article I consider 2 global categories, the business and technical use cases.
- How does the service manage the errors? - there is some retry policy, dead-letter storage, ... ?
- How to scale the application? - it's automatic, with/without downtime, ...
- How to reprocess the data? - or rather, is it possible to reprocess past data, for example to fix a regression added to the code.
To use AWS Lambda with Kinesis Data Streams, you have to set the trigger to one of your Kinesis streams. If there is some data available for processing, the service will invoke the handler function of your application and pass there all records to process. At first glance, it looks similar to the micro-batch processing that you may know from Structured Streaming. But in reality, it's a bit more complicated than that.
You can use AWS Lambda in business and technical categories, much or less easily depending on the data processing nature. Stateless applications are quite easy to implement. Lambda is a stateless service, and AWS runtime can relocate it at any time; i.e., there is no guarantee of executing it on the same physical infrastructure. Due to this specificity, implementing stateful workloads will require the support of an external state store like Elastic File System (EFS).
In the beginning, Lambda had a quite naive approach to deal with errors. The function retried the processing as long as the data was there (= not expired). Despite the simplicity, this approach had some drawbacks like blocking consumers, which is a quite serious issue in streaming applications.
To address it, AWS Lambda evolved and now proposes a more flexible ways to deal with these failures thanks to:
- MaximumRetryAttempts - to control how many times it can reprocess given micro-batch before giving up
- MaximumRecordAgeInSeconds - to control how long the micro-batch can be unprocessed and, hence, retried
- BisectBatchOnFunctionError - to split the failed micro-batch into multiple smaller micro-batches in order to isolate the failing items.
- On-failure Destination - to define where the records that cannot be processed are written. If this destination is missing, the failing records will be simple dropped.
Lambda plugged to a Kinesis Data Stream scales automatically whenever the number of shards changes (aka resharding operation). But manipulating your data source is not the single scalability strategy to apply. Another one uses a Lambda's feature called parallelization factor. In other words, you can define how many Lambda instances will consume the micro-batches of a given shard. Sounds familiar? Yes, I already blogged about a similar concept in Apache Spark called minPartitions.
For this scaling part, keep in mind that by default, all consumers of a given stream must share the reading capacity of 2MB/second/shard. In the context of concurrent processing, increasing the parallelization factor may have little effect. To overcome this limitation, you can use the enhanced fan-out that reserves the throughput of 2MB/second/shard for all instances of a given Lambda function.
If the Lambda starts to fail or you deployed a buggy version, the basic reprocessing is quite easy. You can reattach the source (Kinesis stream) and set the Starting position property to a specific timestamp, e.g., equal to the deployment time of a buggy version. The fault-tolerance relies then on Kinesis Data Streams data retention (max 1 year).
Reprocessing is less obvious for more complex workloads, like the ones involving remote state stores. In addition to seeking the correct starting position of the stream, you may also need to restore the correct state.
Kinesis Data Analytics - Apache Flink
An alternative to AWS Lambda is Kinesis Data Analytics that you can implement either with Apache Flink or SQL statements.
The use cases are pretty the same as for Lambda functions. You can use Kinesis Data Analytics Flink as a pure technical data streaming job, for example, making some filtering or simple transformations and a more complex business-related application. By more complex, I mean things like window-based operations or stateful applications thanks to Apache Flink's capabilities. They can be implemented much easier than in the AWS Lambda because this data processing framework natively provides them.
If there is an unexpected exception not handled by your code, the application will fail and restart from the last checkpoint. Even though the checkpoints are an Apache Flink concept, they can be configured in the service. You can disable them, define the checkpointing interval, and also the minimal pause between the last successful checkpoint and the next one to take. The second concept used in errors management is called snapshot. It's an adapted version of Flink's savepoint that the user or the service can trigger.
Even though both mechanisms are similar, they have some conceptual differences. Checkpoint is managed by the application to provide a recovery mechanism for unexpected job failures. Snapshots - or generally savepoints from Apache Flink - also provide a recovery mechanism but the user or the service manages them. They're more useful for any planned upgrade or any underlying infrastructure change. For example, in Kinesis Data Analytics, the service will automatically create them - if SnapshotsEnabled flag is set to true - for any scaling operation or application update.
To control the parallelization of your processing, you can set 2 properties, Parallelism and ParallelismPerKPU. If you don't know the KPU concept, it's an acronym for Kinesis Processing Units, and every KPU has 1 vCPU and 4GB of memory. By specifying the ParallelismPerKPU we define the concurrency level for a given task in terms of KPU. To know the number of allocated KPUs, you will then need to divide Parallelism by ParallelismPerKPU.
Under-the-hood, the service runs Apache Flink jobs on top of AWS EKS service with a single-tenant Flink cluster. I didn't find whether the autoscaling feature relies on Kubernetes' components but wanted to keep track of this internal and interesting execution detail! Anyway, the service will increase or decrease KPU usage after observing a significant change in the CPU usage for the autoscaling.
But autoscaling doesn't come without costs. The application experiences downtime during this action. And as for many other cloud services, there are soft limits like the max number of KPUs allocated for every application.
So, what happens when your application fails, or you deployed a buggy version on production and saw it only a few days later? As you saw, Kinesis Data Analytics provides a restore mechanism with snapshots (check ApplicationRestoreConfiguration parameters). Their existence plus the data retention in the source, plus eventually data retention of any external data store used in your processing logic, define the reprocessing boundaries.
Initially, I thought that the checkpointing could be manipulated the same way as in Apache Spark Structured Streaming; i.e., you can remove not useful checkpoints and keep only the last one for restore. Finally, I didn't find how to do this in Kinesis Data Analytics. If you have any input on that point, I will be happy to learn and change this paragraph!
Kinesis Data Analytics - SQL
SQL version of Kinesis Data Analytics also covers business and pure technical data processing scenarios. Among the complex use cases, you can use windowed queries and also perform stream-to-stream joins! You can also attach a Lambda function to transform the records before passing them to the SQL logic. However, more customized scenarios like a complex stateful logic should be better expressed with the Flink version of the service.
Kinesis Data Analytics SQL comes with a dedicated stream called error_stream where it sends all errors encountered during the execution of your SQL processing logic. Among the examples, you will find logical errors like division by zero or unparseable events. A recommended practice is to manage these errors by writing them into some external place for further analysis.
The application also has a retry mechanism if it can't persist the processed data into an external destination. In this scenario, it will retry it indefinitely and stop in the future.
You can scale a Kinesis Data Analytics SQL application by defining multiple in-application input streams. Think about these streams as tasks mapped to the shards of the input data source. You can control this parallelism with InputParallelism parameter. For example, if you set it to 2 for a 50-shards Kinesis Data Stream, you will have 2 running in-application input streams, processing 25 shards each. It's recommended to increase this value for high throughput input streams (greater than 100 MB/sec) or use the Flink-based Kinesis Data Analytics if you prefer to keep the logic as a single application.
Aside from these constraints, the service elastically scales the application to accommodate an increased data throughput. The scaling unit is the same as for Flink's version of the service (KPU), and by default, it's equal to 8 KPUs.
The SQL version of the service has a built-in mechanism for handling failures with internal checkpoints. The service takes these checkpoints whenever it delivers the processed data to the configured destination. Thanks to it, the service knows where to restart processing in case of an unexpected failure or planned update.
I didn't find a way to use these checkpoints to reprocess the data from an arbitrary point in the past. It seems to be designed for moving forward pipelines, limiting the reprocessing to the last correctly generated output. If you have any extra information about that point, feel free to leave a comment.
This component of AWS Kinesis stack is better adapted to the technical use cases. To be more precise, it's good for the data synchronization between Kinesis Data Streams and more static sources like S3 or Elasticsearch. Since AWS Lambda is an available transformation step, you can always try to perform more complicated operations on top of the buffered records.
Error management depends on the destination, but generally, you can specify the retry period and access the failed deliveries on S3. An exception is an S3 output because you will lose your data if Firehose doesn't succeed in writing it within 24 hours to the specified bucket.
Firehose scales automatically, but a more important thing to remember here is how the Firehose works. As I mentioned previously, the main purpose of Firehose is to dispatch the data from Kinesis Data Streams to more static (batch-like) data stores. One use case of this approach can be polyglot persistence, where the data is exposed from specialized data stores. Firehose bufferizes data and sends it once one of 2 defined thresholds is reached to perform this dispatching action. The first of them is the buffering time, and the second the buffer size. Once one of them is reached, Firehose delivers buffered data to the output destination.
There is no parallelism configuration like in Kinesis Data Analytics, and the technical implementation is hidden. It's then hard to say whether Firehose works as Lambda, at shard-basics (IMHO quite probable), and if yes, whether it supports parallelization factor (multiple consumers per shard).
Firehose will automatically retry to deliver the records in case of failure and eventually save them in a Dead-Letter location on S3. But there is no mention of how the checkpoint mechanism is working or whether it's present. IMHO it's not there because there is no way to stop Firehose for a while and start it later. The single way to simulate this is to delete and recreate the delivery stream, but it implies data loss between the destruction and creation time. Automatically, there is no way to reprocess the data other than ingesting it to the input stream once again. On the other hand, the main purpose of the service (synchronization) justifies this absence.
That's one of my surprises of 2020! AWS Glue and streaming data sources! Even though the main purpose of Glue remains for me the technical data engineering tasks like data consistency management (schema evolution) or data discovery (data catalog). But since it's based on Apache Spark, you can also implement more user-facing features with the help of the Spark API.
AWS Glue batch jobs use the system of bookmarks to store the information about last processed data. But it's not the case of the streaming jobs that use the Apache Spark's checkpoints written to the location specified in the checkpointLocation parameter.
AWS Glue is a serverless service but you can specify the resources it will use for processing. These resources are called Data Processing Units (DPU) and they vary from 16GB to 32GB of memory, 4 to 8 vCPUs and 50GB to 128GB of attached disk. To scale the job you have to change the allocated DPUs. However, for Kinesis Data Streams source you can't do it without stopping the application. As per documentation, "stop the job first, modify the stream shards, and then restart the job".
Since the method used for fault-tolerance is based on Structured Streaming's checkpoint, to reprocess the data you have to manipulate the metadata files stored in the location specified inside the checkpointLocation option.
One cloud provider and 5 different serverless services to write your streaming applications! Some of them perform better in technical data engineering tasks; some can be used in business scenarios, with or without autoscaling features. Did it help you to get a better view of that aspect? Would you like to read more articles like this one? Let me know in the comments.