RabbitMQ and time-to-live

In RabbitMQ we can use temporary messages and queues. We can create a queue which is automatically deleted within some time of inactivity.

In this article we can see how to deal with time-to-live (TTL) parameter in RabbitMQ queues and messages. The first part describes the configuration to use while the second shows some test cases with short-lived queues and messages.

Messages TTL

First, let's see how we can deal with messages time-to-live. If we need a global configuration for messages going to given queue, we can specify messages TTL on queue declaration. It can be done with the use of x-message-ttl argument. It guarantees that message deads after specified time. "Dead" word means that the message won't be delivered to any of subscribed consumers and won't be accessible through basic.get method applied directly on queue.

An interesting feature can be achieved when x-message-ttl is set to 0. In this case, the configuration behaves almost as "immediate" flag. The messages tries to reach consumer immediately. If it's not possible, the message is considered as dead. But unlike immediate flag, the message is not returned. Instead, it can only be routed to dead-letter exchange.

Another way to specify message TTL is the use of expiration field of message properties. If this field is provided and queue configured with x-message-ttl, lower value of them is used by server.

Let's see messages TTL configuration in some test cases:

@Test
public void should_not_be_able_to_consume_dead_message_because_of_per_queue_message_ttl_configuration() throws IOException, InterruptedException {
  Map<String, Object> args = new HashMap<>();
  args.put("x-message-ttl", 2000);
  directChannel.queueDeclare(EXPIRING_QUEUE, DURABLE.not(), EXCLUSIVE.not() , AUTO_DELETE.not(), args);
  directChannel.queueBind(EXPIRING_QUEUE, EXCHANGE, DIRECT_EXC_KEY);

  // First, publish message and wait 3 seconds
  // (1 more than in x-message-ttl)
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, MANDATORY.yes(), null, "Test".getBytes());
  Thread.sleep(3000);

  CountDownLatch latch = new CountDownLatch(1);
  directChannel.basicConsume(EXPIRING_QUEUE, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS));
  latch.await(2, TimeUnit.SECONDS);

  assertThat(STATS).isEmpty();
  // The message shouldn't be accessible by basic.get
  assertThat(directChannel.basicGet(EXPIRING_QUEUE, AUTO_ACK.yes())).isNull();
}

@Test
public void should_use_lower_value_of_two_message_ttl_parameters() throws IOException, InterruptedException {
  Map<String, Object> args = new HashMap<>();
  args.put("x-message-ttl", 60000); // 1 minute of time-to-live
  directChannel.queueDeclare(EXPIRING_QUEUE, DURABLE.not(), EXCLUSIVE.not() , AUTO_DELETE.not(), args);
  directChannel.queueBind(EXPIRING_QUEUE, EXCHANGE, DIRECT_EXC_KEY);

  // First, publish message with 1 second time-to-live
  // This value should be used as TTL parameter for
  // this message.
  // If TTL is specified on queue and message side,
  // lower value of them is used.
  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("1000").build();
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, MANDATORY.yes(), properties, "Test".getBytes());
  Thread.sleep(2000);

  CountDownLatch latch = new CountDownLatch(1);
  directChannel.basicConsume(EXPIRING_QUEUE, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS));
  latch.await(2, TimeUnit.SECONDS);

  assertThat(STATS).isEmpty();
  // The message shouldn't be accessible by basic.get
  assertThat(directChannel.basicGet(EXPIRING_QUEUE, AUTO_ACK.yes())).isNull();
}

@Test
public void should_not_handle_expired_message() throws IOException, InterruptedException {
  directChannel.queueDeclare(TESTED_QUEUE, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
    Collections.emptyMap());
  directChannel.queueBind(TESTED_QUEUE, EXCHANGE, DIRECT_EXC_KEY);
  // Message builder contains a method called expiration()
  // 300 ms Time-To-Live
  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("300").build();
  String message = "Test_300_ms";
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, properties, message.getBytes());

  CountDownLatch latch = new CountDownLatch(1);
  // message should not be delivered because it expires before thread's awake
  Thread.sleep(550);
  directChannel.basicConsume(TESTED_QUEUE, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS));

  latch.await(3, TimeUnit.SECONDS);

  assertThat(STATS.get(message)).isNull();
}

Queues TTL

Another TTL configuration concerns queues. When they are declared, we can assign to them an argument called x-expires. It specifies how many time a queue can remain in "out-of-use" state. This state occurs when it hasn't consumers, hasn't been redeclared or hasn't any pending basic.get calls to serve.

We can see some basic behaviours of expired queue in below examples:

@Test
public void should_create_queue_and_wait_for_expiring() throws IOException, InterruptedException {
  Map args = new HashMap<>();
  args.put("x-expires", 2000);
  directChannel.queueDeclare(EXPIRING_QUEUE, DURABLE.not(), EXCLUSIVE.not() , 
    AUTO_DELETE.not(), args);
  directChannel.queueBind(EXPIRING_QUEUE, EXCHANGE, DIRECT_EXC_KEY);
  // First, declare consumer, so the queue is in use
  String consumerKey = directChannel.basicConsume(EXPIRING_QUEUE, AUTO_ACK.yes(), 
    new DefaultConsumer(directChannel));
  // Wait some time and delete consumer
  Thread.sleep(3000);
  directChannel.basicCancel(consumerKey);
  // Wait 2.5 seconds and check if the queue is still alive
  Thread.sleep(2500);
  try {
      directChannel.queueDeclarePassive(EXPIRING_QUEUE);
      fail("Should fail when queue is deleted after expiration time");
  } catch (IOException ioe) {
      ShutdownSignalException cause = (ShutdownSignalException)ioe.getCause();
      assertThat(cause.getReason().toString()).contains("NOT_FOUND - " +
              "no queue '2sec_Expiring_Queue' in vhost '/'");
  }
}

@Test
public void should_send_message_after_queue_expiring() throws IOException, InterruptedException {
  Map<String, Object> args = new HashMap<>();
  args.put("x-expires", 2000);
  directChannel.queueDeclare(EXPIRING_QUEUE, DURABLE.not(), EXCLUSIVE.not() , 
    AUTO_DELETE.not(), args);
  directChannel.queueBind(EXPIRING_QUEUE, EXCHANGE, DIRECT_EXC_KEY);
  // First, declare consumer, so the queue is in use
  String consumerKey = directChannel.basicConsume(EXPIRING_QUEUE, AUTO_ACK.yes(), 
    new DefaultConsumer(directChannel));
  // Wait some time and delete consumer
  Thread.sleep(3000);
  directChannel.basicCancel(consumerKey);
  // Wait 2.5 seconds and check if the queue is still alive
  Thread.sleep(2500);
  // Steps are the same as in previous test
  // Now we add the listener for all returned message. A message can
  // be returned when is transient and there are no queue
  // to receive it . It's our case because 2sec_Expiring_Queue was
  // automatically removed
  final CountDownLatch latch = new CountDownLatch(1);
  boolean[] wasReturned = new boolean[] {false};
  directChannel.addReturnListener((replyCode, replyText,exchange, routingKey, properties, body) -> {
      wasReturned[0] = true;
      latch.countDown();
      System.out.println("For returned message "+new String(body));
  });
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, MANDATORY.yes(), null, "Test".getBytes());

  Thread.sleep(2000);
  assertThat(wasReturned[0]).isTrue();
}

The article shows where time-to-live flags can be specified. The first part illustrates how, either with queue configuration or with message expiration property, we can configure TTL on messages. The second one is shorter because it concerns only one method of configuration on queues. This method is based on x-expires argument, to set on queue declaration.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!