Transactions latency and publisher confirms

It's not a surprise, transactions slow down RabbitMQ performances. It's the reason why a suggested way to manage guaranteed delivery is publisher confirms concept.

After discovering transactions in RabbitMQ, it's the good moment to learn more efficient method to handle delivery guarantees through publisher confirms technique. The first part of the article describes in which this technique consists. The second part shows an example of publisher confirms implementation with RabbitMQ Java API.

What is publisher confirms ?

To define publisher confirms, we can tell that it provides a listener notifying publisher about successful message receiving. Several consuming rules are applied, depending on message type:

A little bit confused ? I was when I discovered the term of "unroutable message". In fact, a message is considered as "unroutable" when it can't be put in at least one queue (for example because of missing binding match). The second criterion is that the message should be published with flag mandatory. With these two criteria are met, RabbitMQ will send message back to the sender using basic.return method.

Another thing to know before implementing publisher confirms concerns messages ids when confirm mode is turned in. For this case, when the first confirm.select is made, RabbitMQ starts a counter beginning with 1. This counter represents ids of messages published consecutively. So, the first message has the id 1, the second 2, the third 3 and so on. Each opened channel has its own and independent counter. It's good to know this information because publisher confirms technique uses this assumption of auto-incrementing id by channel.

Publisher confirms example in RabbitMQ

After discovering some useful principles, we can start to write some code to show publisher confirms in action:

private volatile Set<Long> notConfirmedIds = Collections.synchronizedSet(new TreeSet<>());

// ...

@Test
public void should_publish_with_ids_respecting_auto_incremental_sequence() throws IOException, InterruptedException, TimeoutException {
  // Always define channel in "confirm mode", otherwise we won't be able to wait for confirms
  directChannel.confirmSelect();
  for (int i = 0; i < 10; i++) {
    directChannel.basicPublish("", CONSUMER_QUEUE_1, null, "Test".getBytes());
  }
  directChannel.waitForConfirms(5000);

  assertThat(directChannel.getNextPublishSeqNo()).isEqualTo(11L);
}

@Test
public void should_confirm_all_messages_in_confirm_mode_channel() throws IOException, InterruptedException, TimeoutException {
  int msgCount = 10;
  directChannel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
      notConfirmedIds.remove(deliveryTag);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
      // Message was lost, we just print the info for debug; 
      // otherwise, this case should be handled differently
      System.out.println("Not-Acknowledging for message with id " + deliveryTag);
    }
  });

  directChannel.confirmSelect();
  // Publish messages to the channel and put the ids to the queue
  for (int i = 0; i < msgCount; i++) {
    // Add to set with not confirmed message ids
    notConfirmedIds.add(directChannel.getNextPublishSeqNo());
    directChannel.basicPublish("", CONSUMER_QUEUE_1, null, "Test".getBytes());
    directChannel.waitForConfirms(1000);
  }

  // Empty set proves that confirm listener is not linked
  // with messages (n)acknowledging by consumers - in this case
  // we only publish messages without consuming them.
  // Confirm listener is trigerred when RabbitMQ server successful
  // receives given message. In our case, successful receiving is
  // equal to push the message to the queue.
  assertThat(notConfirmedIds).isEmpty();
  int consumedMessages = 0;
  // The prof that messages haven't been consumed:
  while (directChannel.basicGet(CONSUMER_QUEUE_1, AUTO_ACK.yes()) != null) {
    consumedMessages++;
  }
  assertThat(consumedMessages).isEqualTo(10);
}

@Test
public void should_not_confirm_all_messages_without_wait_for_confirm_blocking_call() throws IOException {
  directChannel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
      notConfirmedIds.remove(deliveryTag);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
      // Message was lost, we just print the info for debug; 
      // otherwise, this case should be handled differently
      System.out.println("Not-Acknowledging for message with id " + deliveryTag);
    }
  });

  directChannel.confirmSelect();
  // Publish messages to the channel and put the ids to the queue
  for (int i = 0; i < 10; i++) {
    // Add to set with not confirmed message ids
    notConfirmedIds.add(directChannel.getNextPublishSeqNo());
    directChannel.basicPublish("", CONSUMER_QUEUE_1, null, "Test".getBytes());
    // Note that we should call here waitForConfirms method
    // to be sure that next message is published always
    // after confirming the reception of previous one
    // directChannel.waitForConfirms(1000);
  }

  assertThat(notConfirmedIds).isNotEmpty();
}

@Test(expected = IllegalStateException.class)
public void should_throw_an_exception_when_confirm_waiting_is_expected_for_channel_not_in_confirm_mode() throws TimeoutException, InterruptedException {
  directChannel.waitForConfirms(5000);
}

In previous tests you can observe which methods are related to publisher confirms mode: confirmSelect() to activate select mode on the channel, addConfirmListener() to add a listener for received message, waitForConfirms() to block new messages publishing before confirming the reception of already published ones.

The article shows how to avoid transactions and still guarantee messages delivery. One of techniques which we can use for that is publisher confirms. It uses application sides data holders (such as our TreeSet in test case) to check which messages aren't still not received. We can also learn that there are 3 key methods to deal with messaging in this mode: confirmSelect(), addConfirmListener() and waitForConfirms().


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!