Persistence in RabbitMQ

Persistence is an important part of RabbitMQ configuration. Nobody would like to lose key business messages because of temporary power loss or human manipulation error.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

As you can deduce, this article focuses on persistence points of RabbitMQ. The first described aspect of persistence is related to server automatic recovery. The next part concerns messages durability. Each part is illustrated with JUnit test cases.

Automatic recovery in RabbitMQ

Network connection is not infallible. But RabbitMQ prevents some failures, especially for the connection between client and server. The prevention is based on automatic reconnection after connection loss. With Java API, it can be simply enabled with ConnectionFactory setAutomaticRecovery(boolean) method. Specifically, automatic recovery means:

Quite simple quasi-manual test shows automatic recovery in action:

public class RecoveryTest extends BaseConfig {

  @AfterClass
  public static void resetRecovery() {
    CONNECTION_FACTORY.setAutomaticRecoveryEnabled(false);
  }

  @Test
  public void should_recover_automatically_after_server_restart() throws IOException, TimeoutException, InterruptedException {
    CONNECTION_FACTORY.setAutomaticRecoveryEnabled(true);
    Connection connection = CONNECTION_FACTORY.newConnection();
    System.out.println("Please restart a server now");
    Thread.sleep(10000);

    assertThat(connection.isOpen()).isTrue();
  }

  @Test
  public void should_not_recover_automatically_after_server_restart_with_recovery_turned_off() throws IOException, TimeoutException, InterruptedException {
    CONNECTION_FACTORY.setAutomaticRecoveryEnabled(false);
    Connection connection = CONNECTION_FACTORY.newConnection();
    System.out.println("Please restart a server now");
    Thread.sleep(10000);

    assertThat(connection.isOpen()).isFalse();
  }

}

In Java's API, a connection with automatic recovery is AutorecoveringConnection class implementing Recoverable interface. The last one allows to define recovery listeners, ie. listeners triggered when client succeeds to reconnect after the crash. Apart from that, another kind of listeners can be attached - queues and consumers recovery listeners. However, they are not part of public RabbitMQ API and it notifies when consumer or queue name changes after recovery. Following cases show some of AutorecoveringConnection features:

private List<String> notificationStats = new ArrayList<>();

@BeforeClass
public static void initPersistence() {
  CONNECTION_FACTORY.setAutomaticRecoveryEnabled(true);
  CONNECTION_FACTORY.setNetworkRecoveryInterval(250);
}

@After
public void resetStats() {
  notificationStats.clear();
}

@AfterClass
public static void resetPersistence() {
  CONNECTION_FACTORY.setAutomaticRecoveryEnabled(false);
}

@Test
public void should_notify_shutdown_listener_about_server_shutdown() throws IOException, TimeoutException, InterruptedException {
  AutorecoveringConnection localConnection = (AutorecoveringConnection) CONNECTION_FACTORY.newConnection();
  try {
    localConnection.addShutdownListener((cause) -> {
        System.out.println("Shutdown completed because of protocol method: "+cause.getReason().protocolMethodName());
        notificationStats.add(NOTIF_SHUTDOWN);
      }
    );
    System.out.println("Please restart a server now");
    Thread.sleep(10_000);

  } finally {
    localConnection.close();
  }

  assertThat(notificationStats).contains(NOTIF_SHUTDOWN);
}

@Test
public void should_notify_shutdown_listener_about_server_recovery() throws IOException, InterruptedException, TimeoutException {
  AutorecoveringConnection localConnection = (AutorecoveringConnection) CONNECTION_FACTORY.newConnection();
  try {
    localConnection.addRecoveryListener((recoverable) -> {
        System.out.println("Trying to recover connection");
        notificationStats.add(NOTIF_RECOVERY);
      }
    );
    System.out.println("Please restart a server now");
    Thread.sleep(10_000); 
  } finally {
    localConnection.close();
  }

  assertThat(notificationStats).contains(NOTIF_RECOVERY);
}

@Test
public void should_notify_shutdown_listener_about_queue_recovery() throws IOException, InterruptedException, TimeoutException {
  String[] oldNewName = new String[2];
  String exchangeName = "tmp_exchange";
  String queueName = "tmp_queue";
  AutorecoveringConnection localConnection = (AutorecoveringConnection) CONNECTION_FACTORY.newConnection();
  Channel directChannel = localConnection.createChannel();
  directChannel.exchangeDeclare(exchangeName, ExchangeTypes.DIRECT.getName());
  directChannel.queueDeclare(queueName, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
    Collections.emptyMap());
  directChannel.queueBind(queueName, exchangeName, "key");
  try {
    localConnection.addQueueRecoveryListener((oldName, newName) -> {
      oldNewName[0] = oldName;
      oldNewName[1] = newName;
      notificationStats.add(NOTIF_QUEUE_RECOVERY);
    });
    System.out.println("Please restart a server now");
    Thread.sleep(10_000); 
  } finally {
    directChannel.exchangeDelete(exchangeName);
    localConnection.close();
  }

  assertThat(notificationStats).contains(NOTIF_QUEUE_RECOVERY);
  assertThat(oldNewName[0]).isEqualTo(queueName);
  assertThat(oldNewName[1]).isEqualTo(queueName);
}

Messages durability in RabbitMQ

Knowledge about automatic recovery helps to make some proofs for client persistence (queues and exchanges). To remind some basic rules, messages, queues and exchanges aren't always persistent. To make them persistent:

Persistent messages survive server crash. It means that they can be redelivered to consumers when, for example, RabbitMQ needs a restart. So if both, exchange and queues are durable, and in additionnally, message is persistent too, messages are stored in disk and can be restored after server or network errors. If only some of this rules are respected, messages won't persist and will be lost.

Below you can find some test cases illustrating some of cases: correct persistent messages, persistent message published to not durable queue and not persistent message published to durable queue:

private static final String EXCHANGE_KEY = "Persistence_Exchange";
private static final String QUEUE_NAME = "Persistence_Test_Queue";

@BeforeClass
public static void initPersistence() {
  CONNECTION_FACTORY.setAutomaticRecoveryEnabled(true);
  CONNECTION_FACTORY.setNetworkRecoveryInterval(500);
}

@AfterClass
public static void resetPersistence() {
  CONNECTION_FACTORY.setAutomaticRecoveryEnabled(false);
}

@Test
public void should_not_deliver_a_persistent_message_sent_to_not_durable_queue() throws IOException, InterruptedException, TimeoutException {
  AutorecoveringConnection localConnection = (AutorecoveringConnection) CONNECTION_FACTORY.newConnection();
  AutorecoveringChannel newChannel = (AutorecoveringChannel) localConnection.createChannel();
  try {
    newChannel.exchangeDeclare(EXCHANGE_KEY, ExchangeTypes.DIRECT.getName());
    // queue must be durable, otherwise the messages won't be stored after crash even if they are marked "persistent"
    newChannel.queueDeclare(QUEUE_NAME, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(), Collections.emptyMap());
    newChannel.queueBind(QUEUE_NAME, EXCHANGE_KEY, "");

    CountDownLatch latch = new CountDownLatch(1);
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
      .deliveryMode(Mapping.DeliveryModes.PERSISTENT.value()).build();
    String message = "Test_Not_Persistent";
    newChannel.basicPublish(EXCHANGE_KEY, "", properties, message.getBytes());

    System.out.println("Please restart a server now");
    Thread.sleep(10_000);

    Map<String, Integer> consumed = new HashMap<>();
    newChannel.basicConsume(QUEUE_NAME, AUTO_ACK.yes(), new CountingConsumer(newChannel, latch, consumed));

    latch.await(2, TimeUnit.SECONDS);

    assertThat(newChannel.isOpen()).isTrue();
    assertThat(consumed).isEmpty();
  } finally {
    newChannel.queueDelete(QUEUE_NAME);
    newChannel.exchangeDelete(EXCHANGE_KEY);

    newChannel.close();
    localConnection.close();
  }
}

@Test
public void should_deliver_again_persistent_message_after_server_restart() throws IOException, InterruptedException, TimeoutException {
  AutorecoveringConnection localConnection = (AutorecoveringConnection) CONNECTION_FACTORY.newConnection();
  AutorecoveringChannel newChannel = (AutorecoveringChannel) localConnection.createChannel();
  try {
    newChannel.exchangeDeclare(EXCHANGE_KEY, ExchangeTypes.DIRECT.getName());
    // queue must be durable, otherwise the messages won't be stored after crash even if they are marked "persistent"
    newChannel.queueDeclare(QUEUE_NAME, DURABLE.yes(), EXCLUSIVE.not(), AUTO_DELETE.yes(), Collections.emptyMap());
    newChannel.queueBind(QUEUE_NAME, EXCHANGE_KEY, "");

    CountDownLatch latch = new CountDownLatch(1);
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
        .deliveryMode(Mapping.DeliveryModes.PERSISTENT.value()).build();
    String message = "Test_Not_Persistent";
    newChannel.basicPublish(EXCHANGE_KEY, "", properties, message.getBytes());

    System.out.println("Please restart a server now");
    Thread.sleep(10_000);

    Map<String, Integer> consumed = new HashMap<>();
    newChannel.basicConsume(QUEUE_NAME, AUTO_ACK.yes(), new CountingConsumer(newChannel, latch, consumed));

    latch.await(2, TimeUnit.SECONDS);

    assertThat(newChannel.isOpen()).isTrue();
    assertThat(consumed.get(message).intValue()).isEqualTo(1);
  } finally {
    newChannel.queueDelete(QUEUE_NAME);
    newChannel.exchangeDelete(EXCHANGE_KEY);

    newChannel.close();
    localConnection.close();
  }
}

@Test
public void should_not_deliver_again_not_persistent_message_after_first_failure() throws IOException, InterruptedException, TimeoutException {
  AutorecoveringConnection localConnection = (AutorecoveringConnection) CONNECTION_FACTORY.newConnection();
  AutorecoveringChannel newChannel = (AutorecoveringChannel) localConnection.createChannel();
  try {
    newChannel.exchangeDeclare(EXCHANGE_KEY, ExchangeTypes.DIRECT.getName());
    // queue must be durable, otherwise the messages won't be stored after crash even if they are marked "persistent"
    newChannel.queueDeclare(QUEUE_NAME, DURABLE.yes(), EXCLUSIVE.not(), AUTO_DELETE.yes(), Collections.emptyMap());
    newChannel.queueBind(QUEUE_NAME, EXCHANGE_KEY, "");

    CountDownLatch latch = new CountDownLatch(1);

    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
            .deliveryMode(Mapping.DeliveryModes.NOT_PERSISTENT.value()).build();
    String message = "Test_Not_Persistent";
    newChannel.basicPublish("exchange", "", properties, message.getBytes());

    System.out.println("Please restart a server now");
    Thread.sleep(10_000);

    Map<String, Integer> consumed = new HashMap<>();
    newChannel.basicConsume(QUEUE_NAME, AUTO_ACK.yes(), new CountingConsumer(newChannel, latch, consumed));

    latch.await(2, TimeUnit.SECONDS);

    assertThat(newChannel.isOpen()).isTrue();
    assertThat(consumed).isEmpty();
  } finally {
    newChannel.queueDelete(QUEUE_NAME);
    newChannel.exchangeDelete(EXCHANGE_KEY);

    newChannel.close();
    localConnection.close();
  }
}

This article is mostly composed by test cases showing client's connection automatic recovery and under which conditions messages are persisted.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©