Transaction compensation - aka Sagas

Distributed computing opened a lot of possibilities and horizontal scaling is only one of them. But at the same time it brought some new problems that we need to address during applications conception. And writing a data on different data stores inside one transaction is the one of them.

This post begins by a small presentation of the problem which transaction compensation tries to solve. It's followed by more detailed description of the pattern itself. The third part shows 2 possible implementation patterns. The post terminates with a simple implementation of Saga in Scala against in-memory storage.

ACID and distributed world

Transactions in distributed world can be implemented in various manners. One of them is 2-phase commit where a transaction manager orchestrates all updates on different data stores. However this solution has some important drawbacks. It introduces blocking that is not welcome in distributed environments. Any resource used in given transaction become unavailable for other writings as long as the current transaction isn't committed. It brings some important deadlock problems where the nodes may wait before writing their data infinitely because, for instance, undetected coordinator failure.

If the blocking is a problem, what happens if we eliminate it ? Nothing good. Let's imagine the case when 2 subsequent transactions updates the same row: the first fails and the second succeeds. If in this case we rollback the first transaction to restore the state before it, we'll lost the changes made by the second operation. Therefore, the blocking is required to enable rollbacking.

Transaction compensation

How to deal with this situation of eventual consistency, if we accept to not use blocking two-phase commit ? One of solutions is transaction compensation, also called Saga pattern. In simple terms it means that the system commits operations independently and executes a compensating job in case of errors.

Let's illustrate that with an example that will be coded in the third section. We've an application that enables a user to register on 3 popular social media applications. Since we know that something can go wrong in these calls to 3rd party services, we don't want to let the user in inconsistent state - he must be register on all the services or on none of them. We execute the calls sequentially for the sake of simplicity. The calls to the 1st and 2nd services succeed. Unfortunately the 3rd one returns a 500 HTTP code. Now we use our compensating logic and send the unregister request to 2 first services. And if we're writing some information to our data store, we remove them too.

Above example is quite simplistic since the compensation consists on "removing" the state. But in some more complicated cases we may need to modify it in-place. For instance it's can be the case of counters where in case of a failure, we must decrement already incremented value. It emphasizes an important characteristic of Saga pattern - generalization difficulty. The compensation logic is strictly related to the application and very rarely it can be generalized.

But transaction compensation also has drawbacks:

Implementation models

Saga pattern can be implemented in 2 different models. The first of them is called Choreography. It's an event-based architecture when all services involved in given distributed transactions listen to the work done by their peers. This method begins with one service executing its transaction and publishing an event about that soon after. Depending on the event's nature (success, failure), other services (or service) execute local transactions and generate event in their own. And this events chain execute as long as there is no new events or when the produced events don't interest any of services. This strategy brings loose coupling between the services but it's hard to implement with a lot of services because of listening and tracking logic.

The second model is known as Orchestration and it's the opposite of the first because of the presence of a coordinator service. The role of this service is to tell all services involved in the transaction what to do and when. The coordinator knows then whole execution flow and it's also responsible for triggering compensation transaction. This approach is globally easier to implement and test. Moreover it brings a centralized management that can help to manage concurrency, for example when given item can't be modified by 2 different transactions simultaneously. In the flip side, the coordinator is a Single Point of Failure and if it fails, the system fails with it. In addition, with inattentive developers the coordinator is a good candidate to become a God object .

God object

In simple terms a god object is the object with too much responsibilities or/and knowledge. An example of such object can be a global configuration that not only holds the configuration entries with values but also initializes the connections to 3rd party services and stores the mapping of fields for data storages. It could look like in the following snippet:

  object GodObject {

    val OutputTableName = System.getenv("OUTPUT_TABLE")

    val CassandraCluster: Cluster = Cluster.builder().addContactPoint("127.0.0.1").build()

    val JacksonMapper = new ObjectMapper()

    val OutputTableId = System.getenv("ID")

    Table(OutputTableName).create
  
  }
  

Dealing with such rich code, even though it's almost only a configuration, would be hard - especially if we would like to write unit tests for the dependencies of GodObject.

Transaction compensation example

To see compensation in action I'll use an example of application registering given user in 3 different social media websites. The implementation uses Command/Orchestration pattern because of its simplicity to design and test:

import scala.collection.mutable

class SagaTest extends FlatSpec with Matchers {

  def registerUserToSocialMedias(user: String, failingMethodIndex: Int) = {
    val generators = Seq(FacebookConnector.register _, TwitterConnector.register _, MediumConnector.register _)
    val undoers = Seq(FacebookConnector.undoRegister _, TwitterConnector.undoRegister _, MediumConnector.undoRegister _)
    for (index <- generators.indices) {
      val registerMethod = generators(index)
      val willFail = if (index == failingMethodIndex) true else false
      try {
        registerMethod(user, willFail)
      } catch {
        case re: RuntimeException => {
          for (undoerIndex <- undoers.indices; if undoerIndex < index) {
            undoers(undoerIndex)(user)
          }
          throw re
        }
      }
    }
  }

  behavior of "saga design pattern"

  it should "undo not fully committed transactions" in {
    val exception = intercept[RuntimeException] {
      registerUserToSocialMedias("user1", 1)
    }

    exception.getMessage shouldEqual "Transaction error"
    DataStore.FacebookUsers shouldNot contain("user1")
    DataStore.TwitterUsers shouldNot contain("user1")
    DataStore.MediumUsers shouldNot contain("user1")
  }

  it should "successfully commit all transactions" in {
    registerUserToSocialMedias("user2", Int.MaxValue)

    DataStore.FacebookUsers should contain("user2")
    DataStore.TwitterUsers should contain("user2")
    DataStore.MediumUsers should contain("user2")
  }


}



object FacebookConnector {

  def register(user: String, generateException: Boolean) = {
    if (generateException) {
      throw new RuntimeException("Transaction error")
    }
    DataStore.FacebookUsers.add(user)
  }

  def undoRegister(user: String) = {
    DataStore.FacebookUsers.remove(user)
  }

}

object TwitterConnector {

  def register(user: String, generateException: Boolean) = {
    if (generateException) {
      throw new RuntimeException("Transaction error")
    }
    DataStore.TwitterUsers.add(user)
  }

  def undoRegister(user: String) ={
    DataStore.TwitterUsers.remove(user)
  }

}

object MediumConnector {

  def register(user: String, generateException: Boolean) = {
    if (generateException) {
      throw new RuntimeException("Transaction error")
    }
    DataStore.MediumUsers.add(user)
  }

  def undoRegister(user: String) = {
    DataStore.MediumUsers.remove(user)
  }

}

object DataStore {

  val FacebookUsers = new mutable.HashSet[String]()

  val TwitterUsers = new mutable.HashSet[String]()

  val MediumUsers = new mutable.HashSet[String]()

}

Dealing with multiple transactions in distributed world is not easy. A simple solution using 2-phase commit can slow down whole execution. Hopefully some interesting alternatives exist as Saga pattern. As explained in this post, this pattern is in fact an application-specific logic responsible for undoing the things persisted by successful transactions. It can execute in 2 different manners: after published events or after being triggered by an orchestrator. The last section shown a simplified example of compensating logic for imaginary social media application.