Kinesis sequence number is not an Apache Kafka offset

I have used to say "Kinesis Data Streams is like Apache Kafka, an append-only streaming broker with partitions and offsets". Although often it's true, it's not that simple unfortunately.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

I only understood that when I had to face the problem of empty records returned by GetRecords call for a shard with a lot of data. And yet, the documentation's page clearly explains that scenario under the "GetRecords Returns Empty Records Array Even When There is Data in the Stream" section:

The GetRecords operation does not block. Instead, it returns immediately; with either relevant data records or with an empty Records element. An empty Records element is returned under two conditions:

* There is no more data currently in the shard.
* There is no data near the part of the shard pointed to by the ShardIterator.

The latter condition is subtle, but is a necessary design tradeoff to avoid unbounded seek time (latency) when retrieving records. Thus, the stream-consuming application should loop and call GetRecords, handling empty records as a matter of course.

Source: https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty

That's the documentation. The users share the same observations. Let me quote few of them:

As per my discussion with AWS technical support person, there can be a few messages with empty records and hence it is not a good idea to break when len(get_response['Records']) == 0.

The better approach suggested was - we can have a counter indicating maximum number of messages that you read in a run and exit loop after reading as many messages.

Source: https://stackoverflow.com/a/46580494/9726075
Sometimes you call the GetRecords API with a valid shard iterator and you get back zero records. You might assume that means you’ve reached the end of the shard, there are no more records left to read.

Turns out, that isn’t a safe assumption at all. There may very well be more records in the stream and if you keep chasing the NextShardIterator, you will eventually get them. But you might get back a bunch of empty responses before you get there.

Source: https://medium.com/software-ascending/surprises-from-polling-kinesis-a76462a7efd4

I especially liked the further explanation from the second quoted blog post written by Brian Mears:

It turns out, that a Kinesis shard is made up of a sequence of buckets. Each bucket holds a sequence of zero or more records. Buckets seem to be allocated periodically based on time, not based on the number of records in them. Each bucket owns a slice of time with a fixed duration (or at least, a fixed maximum duration) and holds any records that came in during that time slice.

A ShardIterator seems to represent one particular bucket (and, presumably, a record-offset within that bucket). The GetRecords API seems to only return records from within that bucket, and then give you the NextShardIterator to either continue in that bucket (if there are more records in it) or else move on to the next bucket (if there are not).

If you have a relatively sparse shard, you might end up with some buckets being allocated that reach the end of their time slice without having any records added to them. Any ShardIterators for those buckets will necessarily provide you with zero records, even if there are subsequent buckets that have records.

Clearly, the SequenceNumber is not the Apache Kafka offset! When there is nothing happening in the stream, apparently, the SequenceNumbers keep increasing and we might need to iterate extra times to catch up on the most recent records. I have been using Kinesis Data Streams only in the context of continuously arriving data and have never seen the described behavior. That's why I made a test.

GetRecords emptiness test

The test requires some time because it consists of:

  1. Creating a new stream.
  2. Adding some data to it.
  3. Reading the added data and keeping the SequenceNumber for the next iterator.
  4. Waiting 1 day.
  5. Reading the data again with the saved SequenceNumber and keeping the new SequenceNumber aside.
  6. Writing one new record.
  7. Trying to read the new record with the most recently stored SequenceNumber.

You can easily repeat the steps above from your AWS Cloud Shell:

  1. STREAM_NAME='test-offsets'
    aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 1
    
    .
  2. payload=$(echo -n 'a' | base64)
    aws kinesis put-record --stream-name $STREAM_NAME --data $payload --partition-key 1
    
    payload=$(echo -n 'b' | base64)
    aws kinesis put-record --stream-name $STREAM_NAME --data $payload --partition-key 2
    
  3. aws kinesis get-shard-iterator --stream-name $STREAM_NAME --shard-id shardId-0000000000 --shard-iterator-type TRIM_HORIZON
    # returns...
    {
    	"ShardIterator": "AAAAAAAAAAGn7k1dYirjbM+BLl4n/7PaiVVAm0S0DgPwVn+lCT5PUIHju3RLUCONg0dHAvh0VBxqBPc22+wpmRaPuGKfrOxp7D3FLu3zvP7HmJmDuUCPqASVYBf4fWYFFtWb9oX3nDW5W6Bq6N9nVyyt49mFPrIm2zN7EKEci3FJ8Y+YOI+53MY8gselQrhhuCEN/5Z5okMMqPUAV3QDNHBxQ/BVcBTZX5DRKMpB0QH0n3pogjMEcQ=="
    }
    
    aws kinesis get-records --shard-iterator  AAAAAAAAAAGn7k1dYirjbM+BLl4n/7PaiVVAm0S0DgPwVn+lCT5PUIHju3RLUCONg0dHAvh0VBxqBPc22+wpmRaPuGKfrOxp7D3FLu3zvP7HmJmDuUCPqASVYBf4fWYFFtWb9oX3nDW5W6Bq6N9nVyyt49mFPrIm2zN7EKEci3FJ8Y+YOI+53MY8gselQrhhuCEN/5Z5okMMqPUAV3QDNHBxQ/BVcBTZX5DRKMpB0QH0n3pogjMEcQ==
    
    # returns...
    {
    	"Records": [
        	{
            	"SequenceNumber": "49639543389862895699365693304023657829928072185817071618",
            	"ApproximateArrivalTimestamp": "2023-04-05T07:08:10.670000+00:00",
            	"Data": "YQ==",
            	"PartitionKey": "1"
        	},
        	{
            	"SequenceNumber": "49639543389862895699365693304024866755747687983222882306",
            	"ApproximateArrivalTimestamp": "2023-04-05T07:08:27.176000+00:00",
            	"Data": "Yg==",
            	"PartitionKey": "2"
        	}
    	],
    	"NextShardIterator": "AAAAAAAAAAGpBStpkV3WYhh007EqrAiog/AYhctNLFQqoKPnXL43Tg4078cz3RbT1UdhQJsYEDy9iBebXK8W5I8udS1mOCU2QhzidzIJ6GTIiZpNOYV2kuZsasDupFGgK6LuritfMWZTltcSYCcI9t1U5w+KTgy/WgkV+FklDG/fJ/bEF6C2lCpR0Tg7xBBqwC1UnivP+mrt2W3P0V6uBnz7n2Qj7YgCHjnu8uH7MEHDCsXPv4zpew==",
    	"MillisBehindLatest": 0
    }
    
  4. Waiting 1 day.
  5. # sequence number: 49639543389862895699365693304024866755747687983222882306
    
    aws kinesis get-shard-iterator --stream-name $STREAM_NAME --shard-id shardId-0000000000 --shard-iterator-type AFTER_SEQUENCE_NUMBER --starting-sequence-number 49639543389862895699365693304024866755747687983222882306
    
    {
    	"ShardIterator": "AAAAAAAAAAEodnF6VW96K31Zi8EeehSCFo1s6lGI6cmGEcFx0vv3w1CNhn+9PZUJXFvizsCmWNTEEQfZ4BjGjVecy3TjL/4HrC30DTWzEO2vpHe7WX+1EznYCnmeil42tJumRSk0+AqI2mj9Vj3UoATcprhQeoYBUR89CiwKKU6s7Dg98g3xiuCSvsi1cScsxPuDtArkHdK9iHhjboy1PwkgTHF+tMF5dZfsIr/1vFXo8/mAS43ujQ=="
    }
    
    aws kinesis get-records --shard-iterator AAAAAAAAAAEodnF6VW96K31Zi8EeehSCFo1s6lGI6cmGEcFx0vv3w1CNhn+9PZUJXFvizsCmWNTEEQfZ4BjGjVecy3TjL/4HrC30DTWzEO2vpHe7WX+1EznYCnmeil42tJumRSk0+AqI2mj9Vj3UoATcprhQeoYBUR89CiwKKU6s7Dg98g3xiuCSvsi1cScsxPuDtArkHdK9iHhjboy1PwkgTHF+tMF5dZfsIr/1vFXo8/mAS43ujQ== 
    {
    	"Records": [],
    	"NextShardIterator": "AAAAAAAAAAEWDzT4F/9YD1+usnaXvsfXDpmv9Z/8Drc1C/HKhTMy14dAzZow7Viv5WftzjykU72jgTWjhykZ3WvfoJCbNEe0aVPJJDWajz651kbgA/r4BNEXH4teRB4TCNpz5V2y3npzeTq++g5wU/yUlGR+4UAoAKBkrSXCs/MFq69GfQsaDUuzfqxsUv+c2zhe2WXxLNBqlFSINdPn4qlXaw1oDJ3MUbGeRl4sk6/3Ce2ltkRDDw==",
    	"MillisBehindLatest": 63180000
    }
    
  6. payload=$(echo -n '3' | base64)
    aws kinesis put-record --stream-name $STREAM_NAME --data $payload --partition-key 3
    

Now, when you try to get the most recent record from the NextShardIterator returned in the last get-records call, you'll have to pass through a lot of empty iterators. The indication on how late you are is present in the MillisBehindLatest attribute. You can find the first sequence of the empty iterator ages below:

# First read
aws kinesis get-records --shard-iterator AAAAAAAAAAEWDzT4F/9YD1+usnaXvsfXDpmv9Z/8Drc1C/HKhTMy14dAzZow7Viv5WftzjykU72jgTWjhykZ3WvfoJCbNEe0aVPJJDWajz651kbgA/r4BNEXH4teRB4TCNpz5V2y3npzeTq++g5wU/yUlGR+4UAoAKBkrSXCs/MFq69GfQsaDUuzfqxsUv+c2zhe2WXxLNBqlFSINdPn4qlXaw1oDJ3MUbGeRl4sk6/3Ce2ltkRDDw==

{
	"Records": [],
	"NextShardIterator": "AAAAAAAAAAFAl/e0ymNddglfeQ7DJgF9BdZKpEh84W8N/fr2W9VRhIcbbDFswuSk3H3/ChqjFtsLHJVq7Fd979LMTzas6LImsw8JwFNbLdjGkmI3zJ53bYfiGmOPiecuKWwhpxV3TbBk8rfoE5XcTW9r5CQ7Qu1waJSjBUBLpnLSfHtA/8iRxDx4Gft/tyUcW142ncdlVvEfH/36+SvObdM2YCdZgGZZXTJhhpSI4a6hSxXYXfbegA==",
	"MillisBehindLatest": 62995000
}

# Second read
aws kinesis get-records --shard-iterator AAAAAAAAAAFAl/e0ymNddglfeQ7DJgF9BdZKpEh84W8N/fr2W9VRhIcbbDFswuSk3H3/ChqjFtsLHJVq7Fd979LMTzas6LImsw8JwFNbLdjGkmI3zJ53bYfiGmOPiecuKWwhpxV3TbBk8rfoE5XcTW9r5CQ7Qu1waJSjBUBLpnLSfHtA/8iRxDx4Gft/tyUcW142ncdlVvEfH/36+SvObdM2YCdZgGZZXTJhhpSI4a6hSxXYXfbegA==
{
	"Records": [],
	"NextShardIterator": "AAAAAAAAAAEZc/W6XsHFNJaVKgJDPlAxRVgLDH20yZS3Xn7XGpnGPvpv/L5QWKQ90T0xi/TbpXm7Tos30XJq8ZX90G98Pb369+MbtkrKFZHaCCOq5KrQWc/Dog92oDvkqcl+fXIJyVZGYsyHFIsnElc9LrwAB0omlB2krGt3+yTgsCBslLGd5I4GqwgeOtRdXhXaCA0KzmUp8BLprvEonewzwPncOcd+8SDqHCqzkkTDtmphU/NPJA==",
	"MillisBehindLatest": 62761000
}
# ...

As you can see, the MillisBehindLatest decreased but nonetheless, we are still 17 hours late! It means a lot of empty iterators to pass through before getting our record number 3. That's something that won't happen in Apache Kafka where the offset are continuous values.

I used to consider Kinesis Data Streams as an AWS alternative to Apache Kafka. Indeed, both are streaming brokers, both have an offset-based storage but it happens there are some subtle implementation differences that are not visible at first glance. The SequenceNumber is one of them. I'll be happy to hear your discoveries in the comments!

TAGS: #streaming


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!