Dead letter exchanges in RabbitMQ

Messaging reserves surprises, especially when things go bad. It can occur when, for example, message time to live is exceeded or when there are no more space in the queue to handle new messages. To avoid messages loosing, a special kind of exchange, dead-letter exchange, exists in RabbitMQ.

In this article we focus on dead-letter exchange. At the begin, we discuss some use cases to which this exchange can apply. The second part contains more pragmatic proofs for messages rejection reasons or message header after rejection.

Dead-letter exchange

Comparing to standard exchanges, dead-letter exchanges (DLX) have nothing special. They are declared in the same manner, they share same features, they are both consumed thanks to bound consumers. So, how we can declare an exchange as DLX one ? Simply, by setting x-dead-letter-exchange argument when normal queue is declared. The value of this parameter should point to the name of DLX.

Normally, message routed to DLX keeps its routing key. But this value is overridden if no-DLX queue is configured with x-dead-letter-routing-key parameter. It helps to define new routing key to put on message transferred to DLX.

Besides routing key, dead-lettered message contains also a header identified by -x-death key. It contains some useful information to identify the context of dead-lettering. We can find there, among others: the reason of reject, the time of addition to DLX, the original exchange.

When a message can be moved to DLX ? RabbitMQ lists 3 possible scenarios:

Dead-letter exchange example

After this introduction we can take a look at JUnit test cases illustrating some of presented features:

private static final String DLX_NAME = "MyDeadLetterExchange";
private static final String EXCHANGE = "Direct_Test";
private static final String CONSUMER_QUEUE_1 = "Consumer_1";
private static final String DLX_QUEUE = "Dead_Letter_Queue";
private static final String DLX_KEY = "New_Key_For_DLX_Msg";
private static final String DIRECT_EXC_KEY = "Key_1";
private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();

private Connection connection;

private Channel directChannel;

@Before
public void initializeQueues() throws IOException, TimeoutException {
  // ...
  directChannel.exchangeDeclare(DLX_NAME, ExchangeTypes.FANOUT.getName());
  directChannel.exchangeDeclare(EXCHANGE, ExchangeTypes.FANOUT.getName());

  directChannel.queueDeclare(DLX_QUEUE, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
  Collections.emptyMap());
  directChannel.queueBind(DLX_QUEUE, DLX_NAME, "");

  Map<String, Object> args = new HashMap<>();
  args.put("x-dead-letter-exchange", DLX_NAME);
  args.put("x-dead-letter-routing-key", DLX_KEY);
  directChannel.queueDeclare(CONSUMER_QUEUE_1, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(), args);
  directChannel.queueBind(CONSUMER_QUEUE_1, EXCHANGE, DIRECT_EXC_KEY);
}


@Test
public void should_move_message_to_dlx_because_of_rejecting() throws IOException, InterruptedException {
  String message = "Rejected_message";
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());

  final CountDownLatch latch = new CountDownLatch(1);
  // message should not be delivered because it expires before thread's awake
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new DefaultConsumer(directChannel) {

      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                  byte[] body) throws IOException {
          // Message should be rejected and go to dead letter queue
          // Note that the consumer is configured with AUTO_ACK.not(); 
          // otherwise, the message is acknowledged independently
          // of below code and the test doesn't work
          getChannel().basicNack(envelope.getDeliveryTag(), false, false);
          latch.countDown();
      }
  });
  latch.await(1, TimeUnit.SECONDS);
  // Be sure that was not consumed in
  assertThat(STATS.get(message)).isNull();

  CountDownLatch latch2 = new CountDownLatch(1);
  directChannel.basicConsume(DLX_QUEUE, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch2, STATS));
  latch2.await(1, TimeUnit.SECONDS);

  assertThat(STATS.get(message)).isEqualTo(1);
}

@Test
public void should_move_message_to_dlx_because_of_expiration() throws InterruptedException, IOException {
  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("300").build();
  String message = "Test_300_ms";
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, properties, message.getBytes());

  CountDownLatch latch = new CountDownLatch(1);
  // message should not be delivered because it 
  // expires before thread's awake
  Thread.sleep(550);
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS));
  latch.await(1, TimeUnit.SECONDS);
  // Be sure that was not consumed in
  assertThat(STATS.get(message)).isNull();

  latch = new CountDownLatch(1);
  directChannel.basicConsume(DLX_QUEUE, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS));
  latch.await(1, TimeUnit.SECONDS);

  assertThat(STATS.get(message)).isEqualTo(1);
}

@Test
public void should_correctly_get_information_about_message_routed_to_dead_letter() throws IOException, InterruptedException {
  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("300").build();
  String message = "Test_300_ms";
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, properties, message.getBytes());

  CountDownLatch latch = new CountDownLatch(1);
  // message should not be delivered because it 
  // expires before thread's awake
  Thread.sleep(550);
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS));
  latch.await(1, TimeUnit.SECONDS);
  // Be sure that was not consumed in
  assertThat(STATS.get(message)).isNull();

  StringBuilder headerData = new StringBuilder();
  final CountDownLatch finalLatch = new CountDownLatch(1);
  directChannel.basicConsume(DLX_QUEUE, AUTO_ACK.yes(), new DefaultConsumer(directChannel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                  byte[] body) throws IOException {
          headerData.append(properties.getHeaders().containsKey("x-death")).append(";");
          ArrayList<HashMap<String, Object>> deadLetterData = 
                  (ArrayList<HashMap<String, Object>>) properties.getHeaders().get("x-death");
          headerData.append(deadLetterData.get(0).get("reason").toString()).append(";")
              .append(deadLetterData.get(0).get("exchange").toString()).append(";")
              .append(envelope.getRoutingKey());
      }
  });
  finalLatch.await(2, TimeUnit.SECONDS);

  // Expected elements in headerData String are:
  // containsKey("x-death")=true -> x-death is defined in the header
  // reason=expired -> message dead was caused by expired TTL
  // exchange=Direct_Test -> original exchange to which message was sent
  // routingKey=New_Key_For_DLX_Msg -> new routing key set after detection of
  //                                   message "death"
  assertThat(headerData.toString()).isEqualTo("true;expired;Direct_Test;New_Key_For_DLX_Msg");
}

The article describes what can happen with rejected messages. Dead-letter exchange is good place to store them and adapt appropriated "death" dealing strategy to stored messages. The strategy can depend not only from message but only from additional information added to rejected message (reject reason, original exchange etc.).

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!