Idempotent consumer with AWS DynamoDB streams

In my previous post I presented an implementation of idempotent consumer pattern with Apache Cassandra CDC. One of drawbacks of that solution was the necessity of producing the messages with slower lightweight transactions. In this post I will show you how to do the same with AWS DynamoDB streams and without that constraint.

The post is composed of 2 sections. The former one describes the properties of DynamoDB Streams you can use to implement idempotent consumer pattern. The second one gives a short code example for it.

DynamoDB Streams properties

DynamoDb Streams can be considered as a kind of DynamoDB CDC, i.e. some agent is continuously listening for all changes made on the table and as soon as it detects a change, it sends it to the consumer. The captured events are time-ordered. It means that you will process the events in their real execution order. The events are stored up to 24 hours and in case of any error, your consumer may retry to process failed records.

Retry

Be careful with this retry mechanism. If you're processing batches of events (and likely you will), you may still process given event more than once. That's why, besides of the idempotent consumer preventing you against produced duplicated messages, you should take care of idempotent processing logic.

An important property of DynamoDB Streams is called stream view type. It specifies which kind of data your consumer will receive. If you set this property to NEW_AND_OLD_IMAGES, your consumer will be able to make the difference between old and new record. If you chose NEW_IMAGE or OLD_IMAGE, you will get respectively only changed or previous version of the record. And if you opt for KEYS_ONLY, you will consume only the key attributes of the item.

DynamoDB Streams idempotent consumer

The easiest way to use DynamoDB Stream events is AWS Lambda service that can be directly triggered by every write to the stream. A class representing the event emitted by the DynamoDB is com.amazonaws.services.lambda.runtime.events.DynamodbEvent. If you take a look at the API, you will see that it exposes 2 interesting attributes. The first of them is getRecords() methods returning a list of DynamodbStreamRecord. Every record of that type contains among others:

To implement idempotent consumer, the first property will be sufficient. The code consuming given record only once could look like:

class IdempotentConsumer extends RequestHandler[DynamodbEvent, Unit] {
  override def handleRequest(dynamoDbEvent: DynamodbEvent, context: Context): Unit = {
    import collection.JavaConverters._
    dynamoDbEvent.getRecords.asScala.filter(record => record.getEventName == "INSERT")
        .foreach(record => {
          // do sth with the record
        })
  }
}

But as I mentioned before, the code above doesn't guarantee that given record will be processed exactly once. It protects against duplication from the producer side, but the data will still be processed at least once.

If you compare the idempotency logic with DynamoDB Streams to the logic implemented in one of the previous posts with Apache Cassandra, you can remark some differences in favor of AWS-based solution. The fact of exposing records in a separate data store gives more possibilities to consume it. You don't need to include all the logic inside a CDC agent, coupled to the physical node of the database. Instead, you can use the SDK and either write a Lambda function or any other kind of application. It doesn't mean that the solution is perfect though. As described in the previous paragraph, you may still be aware of the processing semantic which will be at-least once.