Messaging reserves surprises, especially when things go bad. It can occur when, for example, message time to live is exceeded or when there are no more space in the queue to handle new messages. To avoid messages loosing, a special kind of exchange, dead-letter exchange, exists in RabbitMQ.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
In this article we focus on dead-letter exchange. At the begin, we discuss some use cases to which this exchange can apply. The second part contains more pragmatic proofs for messages rejection reasons or message header after rejection.
Dead-letter exchange
Comparing to standard exchanges, dead-letter exchanges (DLX) have nothing special. They are declared in the same manner, they share same features, they are both consumed thanks to bound consumers. So, how we can declare an exchange as DLX one ? Simply, by setting x-dead-letter-exchange argument when normal queue is declared. The value of this parameter should point to the name of DLX.
Normally, message routed to DLX keeps its routing key. But this value is overridden if no-DLX queue is configured with x-dead-letter-routing-key parameter. It helps to define new routing key to put on message transferred to DLX.
Besides routing key, dead-lettered message contains also a header identified by -x-death key. It contains some useful information to identify the context of dead-lettering. We can find there, among others: the reason of reject, the time of addition to DLX, the original exchange.
When a message can be moved to DLX ? RabbitMQ lists 3 possible scenarios:
- message expiration - corresponding value in reason field is expired
- message reject without requeueing (reque = false with basic.reject or basic.nack); rejected in reason field
- no more space in the queue - maxlen in reason field
Dead-letter exchange example
After this introduction we can take a look at JUnit test cases illustrating some of presented features:
private static final String DLX_NAME = "MyDeadLetterExchange"; private static final String EXCHANGE = "Direct_Test"; private static final String CONSUMER_QUEUE_1 = "Consumer_1"; private static final String DLX_QUEUE = "Dead_Letter_Queue"; private static final String DLX_KEY = "New_Key_For_DLX_Msg"; private static final String DIRECT_EXC_KEY = "Key_1"; private static final Map<String, Integer> STATS = new ConcurrentHashMap<>(); private Connection connection; private Channel directChannel; @Before public void initializeQueues() throws IOException, TimeoutException { // ... directChannel.exchangeDeclare(DLX_NAME, ExchangeTypes.FANOUT.getName()); directChannel.exchangeDeclare(EXCHANGE, ExchangeTypes.FANOUT.getName()); directChannel.queueDeclare(DLX_QUEUE, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(), Collections.emptyMap()); directChannel.queueBind(DLX_QUEUE, DLX_NAME, ""); Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", DLX_NAME); args.put("x-dead-letter-routing-key", DLX_KEY); directChannel.queueDeclare(CONSUMER_QUEUE_1, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(), args); directChannel.queueBind(CONSUMER_QUEUE_1, EXCHANGE, DIRECT_EXC_KEY); } @Test public void should_move_message_to_dlx_because_of_rejecting() throws IOException, InterruptedException { String message = "Rejected_message"; directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes()); final CountDownLatch latch = new CountDownLatch(1); // message should not be delivered because it expires before thread's awake directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new DefaultConsumer(directChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // Message should be rejected and go to dead letter queue // Note that the consumer is configured with AUTO_ACK.not(); // otherwise, the message is acknowledged independently // of below code and the test doesn't work getChannel().basicNack(envelope.getDeliveryTag(), false, false); latch.countDown(); } }); latch.await(1, TimeUnit.SECONDS); // Be sure that was not consumed in assertThat(STATS.get(message)).isNull(); CountDownLatch latch2 = new CountDownLatch(1); directChannel.basicConsume(DLX_QUEUE, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch2, STATS)); latch2.await(1, TimeUnit.SECONDS); assertThat(STATS.get(message)).isEqualTo(1); } @Test public void should_move_message_to_dlx_because_of_expiration() throws InterruptedException, IOException { AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("300").build(); String message = "Test_300_ms"; directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, properties, message.getBytes()); CountDownLatch latch = new CountDownLatch(1); // message should not be delivered because it // expires before thread's awake Thread.sleep(550); directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS)); latch.await(1, TimeUnit.SECONDS); // Be sure that was not consumed in assertThat(STATS.get(message)).isNull(); latch = new CountDownLatch(1); directChannel.basicConsume(DLX_QUEUE, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS)); latch.await(1, TimeUnit.SECONDS); assertThat(STATS.get(message)).isEqualTo(1); } @Test public void should_correctly_get_information_about_message_routed_to_dead_letter() throws IOException, InterruptedException { AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("300").build(); String message = "Test_300_ms"; directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, properties, message.getBytes()); CountDownLatch latch = new CountDownLatch(1); // message should not be delivered because it // expires before thread's awake Thread.sleep(550); directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), new CountingConsumer(directChannel, latch, STATS)); latch.await(1, TimeUnit.SECONDS); // Be sure that was not consumed in assertThat(STATS.get(message)).isNull(); StringBuilder headerData = new StringBuilder(); final CountDownLatch finalLatch = new CountDownLatch(1); directChannel.basicConsume(DLX_QUEUE, AUTO_ACK.yes(), new DefaultConsumer(directChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { headerData.append(properties.getHeaders().containsKey("x-death")).append(";"); ArrayList<HashMap<String, Object>> deadLetterData = (ArrayList<HashMap<String, Object>>) properties.getHeaders().get("x-death"); headerData.append(deadLetterData.get(0).get("reason").toString()).append(";") .append(deadLetterData.get(0).get("exchange").toString()).append(";") .append(envelope.getRoutingKey()); } }); finalLatch.await(2, TimeUnit.SECONDS); // Expected elements in headerData String are: // containsKey("x-death")=true -> x-death is defined in the header // reason=expired -> message dead was caused by expired TTL // exchange=Direct_Test -> original exchange to which message was sent // routingKey=New_Key_For_DLX_Msg -> new routing key set after detection of // message "death" assertThat(headerData.toString()).isEqualTo("true;expired;Direct_Test;New_Key_For_DLX_Msg"); }
The article describes what can happen with rejected messages. Dead-letter exchange is good place to store them and adapt appropriated "death" dealing strategy to stored messages. The strategy can depend not only from message but only from additional information added to rejected message (reject reason, original exchange etc.).