Change Data Capture pattern

Versions: Debezium 0.8

Keeping different database synchronized is not an easy task. Thankfully some techniques exist to facilitate it and one of them is called the Changed Data Capture pattern.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

This post explains this approach helping to keep the different data sources sync. The first section explains the pattern and its use cases. The next one talks about different practical existing solutions. The final part shortly presents Debezium, one of platforms for CDC.

CDC definition

The problem of multiple data source synchronization was shortly presented in the post about Polyglot persistence - definition and examples. We could learn from there that it was always good to have a single one source of truth in order to easily replay data manipulations in case of errors. Ideally it would be a streamable source allowing all depending applications to get new data at the same time (= event sourcing). But it's not always possible. Sometimes, and especially in legacy systems, the main storage is a relational database from which we need to synchronize the remaining, more specialized data stores. And it's there where the Change Data Capture (CDC) pattern appears.

The CDC consists on capturing the events, such as adding, updating or removing a record, and applying them to other data stores. It's a great method to create the systems working with incremental loads where only data changes are applied on the data stores.

CDC strategies

The CDC can be implemented with the help of different strategies:

Debezium example

To see the CDC in action, let's set up Debezium platform by executing these commands:

# same commands as here: http://debezium.io/docs/tutorial/
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.7
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.7
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.7
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.7

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

The way it works is quite straightforward and is illustrated in the following test case:

Class.forName("com.mysql.cj.jdbc.Driver")
private val Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/inventory", "root", "debezium")
private val CdcConsumer = {
  val props = new Properties()
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  props.put(ConsumerConfig.GROUP_ID_CONFIG, s"cdc_consumer${System.currentTimeMillis()}")
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  new KafkaConsumer[String, String](props)
}

override protected def afterAll(): Unit = {
  Connection.close()
}

"new customers" should "be intercepted by the CDC client" in {
  val newRecords = new scala.collection.mutable.ListBuffer[String]()
  val id = System.currentTimeMillis
  val messagesToSend = (1 to 10).map(nr => (s"first_name${nr}", s"last_name${nr}", s"email${nr}${id}"))
  val messagesToSendIterator =  messagesToSend.iterator
  CdcConsumer.subscribe(Collections.singletonList("dbserver1.inventory.customers"))
  val startTime = System.currentTimeMillis()
  while (System.currentTimeMillis() - startTime < 15000) {
    val records = CdcConsumer.poll(5000)
    records.forEach(record => {
      newRecords.append(record.value)
    })
    if (messagesToSendIterator.hasNext) {
      val newCustomer = messagesToSendIterator.next()
      val preparedStatement = Connection.prepareStatement("INSERT INTO customers (first_name, last_name, email) " +
        "VALUES (?, ?, ?)")
      preparedStatement.setString(1, newCustomer._1)
      preparedStatement.setString(2, newCustomer._2)
      preparedStatement.setString(3, newCustomer._3)
      preparedStatement.execute()
      preparedStatement.close()
    }
  }

  newRecords should have size 10
  val updatedRecords = newRecords.mkString("; ")
  messagesToSend.foreach((customerTuple) => {
    // The message looks like:
    // {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},
    // {"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},
    // {"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value",
    // "field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},
    // {"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},
    // {"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value",
    // "field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string",
    // "optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64",
    // "optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string",
    // "optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32",
    // "optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},
    // {"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},
    // {"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source",
    // "field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],
    // "optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":
    // {"id":1104,"first_name":"first_name5","last_name":"last_name5","email":"email51525174633771"},
    // "source":{"version":"0.7.5","name":"dbserver1","server_id":223344,"ts_sec":1525174640,"gtid":null,
    // "file":"mysql-bin.000003","pos":32359,"row":0,"snapshot":false,"thread":39,"db":"inventory","table":"customers"},
    // "op":"c","ts_ms":1525174640789}}
    // But it's simpler to check only for an email
    updatedRecords should include (customerTuple._3)
  })
}

The Change Data Capture pattern is very useful in the incremental loads. As shown in this post it can be implemented with several different techniques: either as a marker column, partitioned data source, triggers or logs parser. Each of them has some advantages and more or less drawbacks. After a quick analysis the less invasive solution is the analysis of the logs - especially that some of open source solutions exist. One of them is Debezium that was shown in action in the last section.