Transactions in RabbitMQ

At the first look, transactions appears as an element strictly related to databases. However, we retrieve them also in messaging.

This articles explains some points about transactions in RabbitMQ. The first part describes them from theoretical point of view while the second one shows some simple use cases.

Transactions in RabbitMQ

Transactions in RabbitMQ concern only messages. So, it works only when a message is published or acknowledged. It's important to underline the acknowledged word because it doesn't mean the same as consumed. Strictly speaking, when a message is rolled back after acknowledging, it's not requeued. We can see it happen in one of tests in the next part of this article.

However, transactions have some negative points, mostly related to RabbitMQ performances. First of all, who tells transaction, he tells blocking operations. RabbitMQ is not an exception. Every transaction commit forces working thread to wait the end of commit. This end of commit can be, for example, the moment when all concerned queues accept sent message. In additionally, if the message is persistent, each published transaction requires fsync modified data. Simply speaking, it writes data on disk.

Because transactions can decrease significantly performances, RabbitMQ introduces another technique to deal with messages delivery guarantees, called publisher confirms.

Example of transactions in RabbitMQ

But before talking about publisher confirms in one of next articles, let's take a look at some methods using transactions:

private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();

private Connection connection;

private Channel directChannel;

@Before
public void initializeQueues() throws IOException, TimeoutException {
  // ...
  // enables transactional mode for this channel
  // without this declaration given error will be thrown when trying to manipulate transaction commit or rollback:
  // "method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED -
  // channel is not transactional, class-id=90, method-id=30)"
  directChannel.txSelect();
  directChannel.exchangeDeclare(EXCHANGE, ExchangeTypes.DIRECT.getName());

  directChannel.queueDeclare(CONSUMER_QUEUE_1, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
    Collections.emptyMap());
  directChannel.queueBind(CONSUMER_QUEUE_1, EXCHANGE, DIRECT_EXC_KEY);
} 

@Test
public void should_not_consume_published_message_without_transaction_commit() throws IOException, InterruptedException {
  String message = "Test_Not_Committed";
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());

  CountDownLatch latch = new CountDownLatch(1);

  // declare consumer
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));

  latch.await(2, TimeUnit.SECONDS);

  // If the channel wasn't transactional, we could be able to consume the message - the code is exactly the same as
  // for the DirectExchangeTest
  // But since published message is not committed, RabbitMQ can't consume it
  assertThat(STATS).isEmpty();
}

@Test
public void should_rollback_not_acknowledged_message() throws IOException, InterruptedException, TimeoutException {
  String message = "Test_Not_Ack_Msg";
  STATS.put(message, 0);
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
  directChannel.txCommit();

  CountDownLatch latch = new CountDownLatch(2); // 2 because we want to wait 2 seconds before continue
  // declare consumers
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new DefaultConsumer(directChannel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
        byte[] body) throws IOException {
      String msg = new String(body);
      STATS.put(msg, STATS.get(msg) + 1);
      getChannel().basicNack(envelope.getDeliveryTag(), MULTIPLE.not(), REQUEUE.yes());
      //getChannel().txCommit();
      getChannel().txRollback();
      latch.countDown();
    }
  });
  latch.await(2, TimeUnit.SECONDS);

  // Because we rollback the transaction, not acknowledging should be ignored.
  // To see not acknowledging work, comment txRollback() line and uncomment txCommit() one
  assertThat(STATS.get(message)).isEqualTo(1);
}

This article speaks briefly about transactions in RabbitMQ. As in other technologies, they're blocking and overused can lead to make the application synchronous (working thread need to wait the confirm before continue). It's one of the reasons why RabbitMQ introduced some quicker way to deal with messages delivery and publishing guarantees. It's called publisher confirms and will be described further.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!