Change Data Capture pattern

on waitingforcode.com

Change Data Capture pattern

Keeping different database synchronized is not an easy task. Thankfully some techniques exist to facilitate it and one of them is called the Changed Data Capture pattern.

This post explains this approach helping to keep the different data sources sync. The first section explains the pattern and its use cases. The next one talks about different practical existing solutions. The final part shortly presents Debezium, one of platforms for CDC.

CDC definition

The problem of multiple data source synchronization was shortly presented in the post about Polyglot persistence - definition and examples. We could learn from there that it was always good to have a single one source of truth in order to easily replay data manipulations in case of errors. Ideally it would be a streamable source allowing all depending applications to get new data at the same time (= event sourcing). But it's not always possible. Sometimes, and especially in legacy systems, the main storage is a relational database from which we need to synchronize the remaining, more specialized data stores. And it's there where the Change Data Capture (CDC) pattern appears.

The CDC consists on capturing the events, such as adding, updating or removing a record, and applying them to other data stores. It's a great method to create the systems working with incremental loads where only data changes are applied on the data stores.

CDC strategies

The CDC can be implemented with the help of different strategies:

  • Marker column
    The first idea consists on the use of a marker column, i.e. a column marking given data as synchronized or to synchronize. A very obvious example of such column is last_modified, updated every time the row is changed. Another option could be the use of version column following the versioning taken by the incremental load mechanism. It can be easily integrated with an ETL pipeline taking such marked data from one table to other places through a simple SQL-like query.
    However the marker column won't work good with the physical removals since the information about that operation is kept in the column that... has just been deleted. In addition, every time the whole dataset is concerned so it may brings some latency in the phase retrieving changed lines.
  • Partitioning
    Another approach to catch only modified data can be the partitioning scheme. For instance, we can store the data at hourly basis in different directories/tables. The loading mechanism takes all the data from this place and moves into another one.
    But this solution also has some drawbacks. It's not easy to maintain so organized data and all debugging operations may be difficult to do. Retrieving the history of change for given object would require to query not 1 place but more than that.
  • Triggers
    Some data sources support the use of triggers that will automatically launch the update on the other data sources (e.g. event store table) or propagate the change to a common even store.
    However it's not an easy task to connect a RDBMS trigger to the external systems and its difficulty can be one of the main drawbacks of this approach. Another one consists the impact on write performance since the write is considered as successful only after trigger invocation.
  • Low-level logs listening
    This approach work with a crawler reading the database binary logs and applying the changes on the depending data sources. It's less invasive solution than the triggers. Concretely it consists on following what happens in the database logs where the operations are registered, and replaying these events on the other data stores. Very often, especially thanks to the open source projects like Debezium, Maxwell or Kafka JDBC Connector, such accumulated events are detected and later pushed to a messaging system (Kafka, Kinesis, Pub/Sub...) in order to make them available for other consumers working with their own data stores. Having something production-ready to use is a big plus of this solution. It's one of the reasons why it was chosen as a solution tested in the next section. Moreover, in most cases it's able to support deletes.
    As a drawback we could note that this solution is often a complex one so it'll require additional maintenance effort. Also if you're not using Kafka and you don't want to start, some of the solutions may be inadequate and developing own similar system is not quick task.
  • Full data reload
    In this strategy the whole dataset is reloaded at regular interval. It's obviously not an example of incremental load. It requires the infrastructure supporting given data load and reducing the time needed to ingest the whole dataset.

Debezium example

To see the CDC in action, let's set up Debezium platform by executing these commands:

# same commands as here: http://debezium.io/docs/tutorial/
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.7
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.7
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.7
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.7

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

The way it works is quite straightforward and is illustrated in the following test case:

Class.forName("com.mysql.cj.jdbc.Driver")
private val Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/inventory", "root", "debezium")
private val CdcConsumer = {
  val props = new Properties()
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  props.put(ConsumerConfig.GROUP_ID_CONFIG, s"cdc_consumer${System.currentTimeMillis()}")
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  new KafkaConsumer[String, String](props)
}

override protected def afterAll(): Unit = {
  Connection.close()
}

"new customers" should "be intercepted by the CDC client" in {
  val newRecords = new scala.collection.mutable.ListBuffer[String]()
  val id = System.currentTimeMillis
  val messagesToSend = (1 to 10).map(nr => (s"first_name${nr}", s"last_name${nr}", s"email${nr}${id}"))
  val messagesToSendIterator =  messagesToSend.iterator
  CdcConsumer.subscribe(Collections.singletonList("dbserver1.inventory.customers"))
  val startTime = System.currentTimeMillis()
  while (System.currentTimeMillis() - startTime < 15000) {
    val records = CdcConsumer.poll(5000)
    records.forEach(record => {
      newRecords.append(record.value)
    })
    if (messagesToSendIterator.hasNext) {
      val newCustomer = messagesToSendIterator.next()
      val preparedStatement = Connection.prepareStatement("INSERT INTO customers (first_name, last_name, email) " +
        "VALUES (?, ?, ?)")
      preparedStatement.setString(1, newCustomer._1)
      preparedStatement.setString(2, newCustomer._2)
      preparedStatement.setString(3, newCustomer._3)
      preparedStatement.execute()
      preparedStatement.close()
    }
  }

  newRecords should have size 10
  val updatedRecords = newRecords.mkString("; ")
  messagesToSend.foreach((customerTuple) => {
    // The message looks like:
    // {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},
    // {"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},
    // {"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value",
    // "field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},
    // {"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},
    // {"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value",
    // "field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string",
    // "optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64",
    // "optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string",
    // "optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32",
    // "optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},
    // {"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},
    // {"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source",
    // "field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],
    // "optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":
    // {"id":1104,"first_name":"first_name5","last_name":"last_name5","email":"email51525174633771"},
    // "source":{"version":"0.7.5","name":"dbserver1","server_id":223344,"ts_sec":1525174640,"gtid":null,
    // "file":"mysql-bin.000003","pos":32359,"row":0,"snapshot":false,"thread":39,"db":"inventory","table":"customers"},
    // "op":"c","ts_ms":1525174640789}}
    // But it's simpler to check only for an email
    updatedRecords should include (customerTuple._3)
  })
}

The Change Data Capture pattern is very useful in the incremental loads. As shown in this post it can be implemented with several different techniques: either as a marker column, partitioned data source, triggers or logs parser. Each of them has some advantages and more or less drawbacks. After a quick analysis the less invasive solution is the analysis of the logs - especially that some of open source solutions exist. One of them is Debezium that was shown in action in the last section.

Share, like or comment this post on Twitter: