Change Data Capture and NoSQL

Change Data Capture (CDC) is a technique helping to smoothly pass from classical and static data warehouse solution to modern streaming-centric architecture. To do that you can use solutions like Debezium which connects RDBMS or MongoDB to Apache Kafka. In this post, I will try to check whether CDC can also apply to other data stores like Apache Cassandra, Elasticsearch and AWS DynamoDB.

Throughout 3 sections of this post I will check whether it's possible to "stream" the data from a static data store to Apache Kafka. I will begin with 2 key/value-based solutions, Apache Cassandra and AWS DynamoDB. In the last part, I will check the case of Elasticsearch.

CDC and Apache Cassandra

CDC in Apache Cassandra was added in 3.8 version. It's a node option that can be activated with cdc_enabled property. In addition to that, the table we want to monitor also must be created/altered with CDC enabled:

CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;

ALTER TABLE foo WITH cdc=true;

Also, you can fine-tune the behavior and specify the directory where the CDC files will be created and the disk space they can take. Be particularly careful with this last option because if your CDC file will reach the limit, the writes to the table will fail with WriteTimeoutException. The data is written to the CDC files called in Apache Cassandra commitlog segments, once the memtable is flushed to disk.

Let's take a quick look at what happens if we enable a CDC on one of Apache Cassandra tables. Let's first start the container:

# I used the same configuration like SmartCat
docker run --name cassandra-cdc -v ~/programming/cdc-cassandra/cassandra.yaml:/tmp/cassandra.yaml -v ~/programming/cdc-cassandra/data:/var/lib/cassandra/data -d cassandra:3.11 -Dcassandra.config=/tmp/cassandra.yaml

Now, I will create 2 tables with CDC after accessing to the container with docker exec -ti cassandra-cdc bash:

# cqlsh
CREATE KEYSPACE cdc  WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1};
USE cdc;
CREATE TABLE orders (id int, amount double, PRIMARY KEY(id)) WITH cdc=true;
CREATE TABLE users (login ascii, long_name text, PRIMARY KEY(login)) WITH cdc=true;

INSERT INTO orders (id, amount) VALUES (1, 100);
INSERT INTO orders (id, amount) VALUES (2, 100);
INSERT INTO orders (id, amount) VALUES (3, 100);
INSERT INTO orders (id, amount) VALUES (4, 100);
INSERT INTO users (login, long_name) VALUES ('a', 'aa');
INSERT INTO users (login, long_name) VALUES ('b', 'bb');
INSERT INTO users (login, long_name) VALUES ('c', 'cc');

In your disk you should see 2 directories, 1 for each table:

bartosz:~/programming/cdc-cassandra/data$ ls cdc/
orders-1fd1a7b055c611e9ab286d2c86545d91  users-257dae7055c611e9ab286d2c86545d91

And if you print the tree for each of them you should see:

bartosz:~/programming/cdc-cassandra/data$ tree 
cdc/
├── orders-1fd1a7b055c611e9ab286d2c86545d91
│   ├── backups
│   ├── md-1-big-CompressionInfo.db
│   ├── md-1-big-Data.db
│   ├── md-1-big-Digest.crc32
│   ├── md-1-big-Filter.db
│   ├── md-1-big-Index.db
│   ├── md-1-big-Statistics.db
│   ├── md-1-big-Summary.db
│   └── md-1-big-TOC.txt
└── users-257dae7055c611e9ab286d2c86545d91
    ├── backups
    ├── md-1-big-CompressionInfo.db
    ├── md-1-big-Data.db
    ├── md-1-big-Digest.crc32
    ├── md-1-big-Filter.db
    ├── md-1-big-Index.db
    ├── md-1-big-Statistics.db
    ├── md-1-big-Summary.db
    └── md-1-big-TOC.txt

4 directories, 16 files

To read Apache Cassandra commitlog segments, you can write your own listener by defining the behavior or CommitLogReader and implementing CommitLogReadHandler. Among already existent solutions, I believed to find one at Confluent's Hub. At first glance I thought that Confluent's Cassandra connector was a bidirectional connector but in fact it only supports writing to Apache Cassandra. After some deeper research I found another connector called Cassandra source from Lenses, which supports Cassandra as source and sink.

CDC and DynamoDB

Let's see now how to pass the data from AWS DynamoDB data store to Apache Kafka. The most optimal solution to do that uses DynamoDB Streams which simply listen for the data changes on the table and expose them to the consumers - just like any Debezium's CDC Connector. To use DynamoDB Streams you have basically 2 choices, AWS Lambda or a containerized SDK consumer.

AWS Lambda is a much easier way to consume streamed data thanks to its native support for DynamoDB streams trigger. In Serverless framework you can configure it like that and then use write the Lambda's logic like for any other trigger:

plugins:
 - serverless-plugin-aws-resolvers

events:
  - stream:
      type: dynamodb
      batchSize: 100
      startingPosition: LATEST
      arn: ${aws:dynamodb:MyStack-StreamedDynamoDbTable}:LatestStreamArn}

But if you don't want such strong coupling with AWS services, you can still use AWS SDK and more specifically com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams. You will find an example in the "Read more" section's link.

Other stores

When I was analyzing the data stores supported by Debezium I saw MongoDB. In this context, the CDC connector uses the same mechanism as the replicas of the master dataset, i.e. it periodically reads a file called oplog. It's in this file where MongoDB registers all changes made on the documents, so all removals and additions. During every read, the registered changes are sent to the configured Kafka topic.

Among other popular data stores, there is no available solution to make a CDC from an Elasticsearch index (or it's hard to find!). Obviously, CDC will also be hard to achieve with data warehouse solution simply because in most of data architectures, there are located at the end of the pipeline. Also, they are often loaded in big batches, so you control this process better than for the input data sources that are fed by 3rd party services or users. Regarding file systems, you can different choices. On the cloud (S3, GCS), you can use the system of events and connect an AWS Lambda (S3) or Object Change Notifications (GCS) to track changes made on the files. In local file systems you can use for instance JDK's WatchService that I covered in Watching files with WatchService blog post.

Data architecture based on a common streaming data source is much more powerful and flexible than the solutions using static data stores. With a streaming ETL, you are able to create a loosely coupled consumer that, depending on the business needs, are able to provide the results with lower or higher latency. But if you already have an architecture using a static data source, no worries. CDC is able to copy the data in real-time from your data store to the streaming broker. Unfortunately, it's not available everywhere and as you could see in this post, Elasticsearch is one of the exceptions. On the other side, it's quite comprehensible since rarely it's considered as first-class storage for raw data. It's much more the case for key-based stores like Apache Cassandra or DynamoDB which can be considered as a way to deduplicate raw messages.