Exchanges in RabbitMQ

Exchanges are key element in RabbitMQ messages transmission. Every message is firstly routed to these exchanges and only after it's dispatched to appropriate queues. A queue can't be declared without associated exchange.

In this article we focus on 4 exchange types - each one described in separated part, commented with JUnit case.

Direct exchange

The simplest exchange is called direct. The routing is achieved thanks to strictly defined exchange key. The message will be sent only for the queues subscribed to given exchange with specific key. To illustrate that in real world, we could identify direct exchange as e-mail written to concrete recipient.

Below, you can find some test cases showing a very basic implementation of direct exchange:

private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();

@Test
public void should_consume_published_message_only_once() throws IOException, InterruptedException {
    String message = "Test_1";
    directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());

    CountDownLatch latch = new CountDownLatch(3);

    // declare consumers - note that we declare two consumers to one queue
    //  It shouldn't made the message being received twice.
    directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
      new CountingConsumer(directChannel, latch, STATS));
    directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
      new CountingConsumer(directChannel, latch, STATS));
    directChannel.basicConsume(CONSUMER_QUEUE_2, AUTO_ACK.yes(), 
      new CountingConsumer(directChannel, latch, STATS));

    latch.await(2, TimeUnit.SECONDS);

    // Even if there are 2 consumers defined for Queue#1, 
    // the message is received only once
    assertThat(STATS.get(message).intValue()).isEqualTo(1);
}

@Test
public void should_consume_published_message_twice() throws IOException, InterruptedException {
    String message = "Test_2";
    directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());

    CountDownLatch latch = new CountDownLatch(3);

    // declare consumers
    directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
      new CountingConsumer(directChannel, latch, STATS));
    directChannel.basicConsume(CONSUMER_QUEUE_1_BIS, AUTO_ACK.yes(), 
      new CountingConsumer(directChannel, latch, STATS));
    directChannel.basicConsume(CONSUMER_QUEUE_2, AUTO_ACK.yes(), new 
      CountingConsumer(directChannel, latch, STATS));

    latch.await(2, TimeUnit.SECONDS);

    // Two valid queues are bound to direct channel's exchange 
    // thanks to DIRECT_EXC_KEY
    assertThat(STATS.get(message).intValue()).isEqualTo(2);
}

Fanout exchange

Another simpler kind of exchange is fanout type. It completely ignores the concept of routing keys. Instead of using this concept, the fanout exchange dispatches received message to all subscribed queues. So, if we declare 3 queues to fanout exchange, all 3 queues will receive each published message. In real world, this kind of logic is implemented on mailing lists where user (one queue) subscribes to one mailing list (RabbitMQ's exchange) and receives the notifications about all new messages posted in the list.

There are nothing better than to show that in simple JUnit case:

private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();

@Before
public void initializeQueues() throws IOException, TimeoutException {
  // ...
  fanoutChannel.queueBind(CONSUMER_QUEUE_1, EXCHANGE, FANOUT_KEY);
  fanoutChannel.queueBind(CONSUMER_QUEUE_2, EXCHANGE, NOT_FANOUT_KEY);
}

@Test
public void should_consume_published_message_twice() throws IOException, InterruptedException {
    String message = "Test_Fanout_1";
    fanoutChannel.basicPublish(EXCHANGE, FANOUT_KEY, null, message.getBytes());

    CountDownLatch latch = new CountDownLatch(2);

    // declare consumers - note that we publish a message with FANOUT_KEY
    // and that the CONSUMER_QUEUE_2 is bound to NOT_FANOUT_KEY
    // If it was direct exchange, this queue shouln't receive the message 
    fanoutChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
      new CountingConsumer(fanoutChannel, latch, STATS));
    fanoutChannel.basicConsume(CONSUMER_QUEUE_2, AUTO_ACK.yes(), 
      new CountingConsumer(fanoutChannel, latch, STATS));

    latch.await(2, TimeUnit.SECONDS);

    // Because fanout exchange doesn't take into account any additional criteria, 
    // two subscribed consumers receive the message
    assertThat(STATS.get(message).intValue()).isEqualTo(2);
}

Topic exchange

Until now we have been working with pretty strict exchanges. But RabbitMQ has also exchanges more flexible, as topic exchange. This type of exchange takes routing key in consideration when a message is published. However, it doesn't make strict pattern matching but more RegEx-like one.

On the other hand, the flexibility costs. Topic exchange defines a strict rule regarding routing key name. It must be composed by words delimited by dots, as for example: "this.is.my.key". The limit of the key is 255 bytes. The matching rules are:

Below you can find some examples of topic exchange:

private static final Map STATS = new ConcurrentHashMap<>();

@Before
public void initializeQueues() throws IOException, TimeoutException {
  // ...
  topicChannel.queueBind(CONSUMER_QUEUE_1, EXCHANGE, "users.*.*");
  topicChannel.queueBind(CONSUMER_QUEUE_2, EXCHANGE, "users.#");
}

@Test
public void should_deliver_message_to_2_bound_queues() throws IOException, InterruptedException {
  String message = "Topic_Test_1";
  topicChannel.basicPublish(EXCHANGE, "users.and.others", null, message.getBytes());

  CountDownLatch latch = new CountDownLatch(2);

  // declare consumers
  topicChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
    new CountingConsumer(topicChannel, latch, STATS));
  topicChannel.basicConsume(CONSUMER_QUEUE_2, AUTO_ACK.yes(), 
    new CountingConsumer(topicChannel, latch, STATS));

  latch.await(2, TimeUnit.SECONDS);

  // Both patterns match
  assertThat(STATS.get(message).intValue()).isEqualTo(2);
}

@Test
public void should_deliver_message_to_only_1_queue_because_of_mapping_difference() throws InterruptedException, IOException {
  String message = "Topic_Test_2";
  topicChannel.basicPublish(EXCHANGE, "users.and.others.living.here", null, message.getBytes());

  CountDownLatch latch = new CountDownLatch(2);

  // declare consumers
  topicChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
    new CountingConsumer(topicChannel, latch, STATS));
  topicChannel.basicConsume(CONSUMER_QUEUE_2, AUTO_ACK.yes(), 
    new CountingConsumer(topicChannel, latch, STATS));

  latch.await(2, TimeUnit.SECONDS);

  // The message will be delivered to only one consumer because only "users.#" pattern matches
  // The "users.*.*" pattern would match if the routing key was "users.and.others"
  assertThat(STATS.get(message).intValue()).isEqualTo(1);
}

@Test
public void should_not_deliver_the_message_because_there_are_no_matching_key() throws IOException, InterruptedException {
  String message = "Topic_Test_3";
  topicChannel.basicPublish(EXCHANGE, "me.and.users.living.here", null, message.getBytes());

  CountDownLatch latch = new CountDownLatch(2);

  // declare consumers
  // Neither CONSUMER_QUEUE_1 nor CONSUMER_QUEUE_2 don't have their bound keys
  // matching "me.and.users.living.there" - "users" is declared in the middle
  // instead of the beginning
  topicChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
    new CountingConsumer(topicChannel, latch, STATS));
  topicChannel.basicConsume(CONSUMER_QUEUE_2, AUTO_ACK.yes(), 
    new CountingConsumer(topicChannel, latch, STATS));

  latch.await(2, TimeUnit.SECONDS);

  // The message won't be delivered because there are no consumer bound to (anyWord)users(anyWord) pattern
  assertThat(STATS.containsKey(message)).isFalse();
}

@Test
public void should_not_deliver_the_message_if_message_key_is_shorter_than_routing_key() throws IOException, InterruptedException {
  String message = "Topic_Test_4";
  topicChannel.basicPublish(EXCHANGE, "users.99", null, message.getBytes());

  CountDownLatch latch = new CountDownLatch(1);

  // CONSUMER_QUEUE_2 should match because of use of # substitution
  // but CONSUMER_QUEUE_1 won't match because it expects
  // to have a key users.(anyWord).(anyWord)
  topicChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
    new CountingConsumer(topicChannel, latch, STATS));

  latch.await(2, TimeUnit.SECONDS);

  assertThat(STATS.containsKey(message)).isFalse();
}

@Test(expected = IllegalArgumentException.class)
public void should_fail_on_sending_message_with_too_long_routing_key() throws IOException {
  String message = "Test_bad_routing_key";
  String routingKey = "1";
  for (int i = 2; i < 300; i++) {
      routingKey += "."+i;
  }
  topicChannel.basicPublish(EXCHANGE, routingKey, null, message.getBytes());
}

Headers exchange

The last type of exchange is based on message metadata passed in its headers. It's why it's called headers exchange. The configuration allows to specify if only one or all of headers must match before delivering a message to queue. This customization is done when the queue is bound to exchange through to argument key called x-match. If it's equal to "any", any of header values must match. If it's equal to "all", all headers must match.

In the same time, routing key is completely ignored which is proved through following test cases:

private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();

@Before
public void initializeQueues() throws IOException, TimeoutException {
  // ...
  Map<String, Object> urgentArgs = new HashMap<>();
  urgentArgs.put("x-match", "any");
  urgentArgs.put("handle-status", HANDLE_NOW);
  urgentArgs.put("message-author", "none");
  headersChannel.queueDeclare(CONSUMER_QUEUE_1, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
          Collections.emptyMap());
  // Note: queues are bound with specific mapping
  headersChannel.queueBind(CONSUMER_QUEUE_1, EXCHANGE, ALL_ROUTINGS, urgentArgs);

  Map<String, Object> tomorrowArgs = new HashMap<>();
  tomorrowArgs.put("x-match", "any");
  tomorrowArgs.put("handle-status", HANDLE_TOMORROW);
  tomorrowArgs.put("message-author", RICH_CLIENT);
  headersChannel.queueDeclare(CONSUMER_QUEUE_2, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
          Collections.emptyMap());
  headersChannel.queueBind(CONSUMER_QUEUE_2, EXCHANGE, ALL_ROUTINGS, tomorrowArgs);

  Map<String, Object> strictAndMatching = new HashMap<>();
  // Difference between all and any ? all - all headers must match, 
  // any - only one from listed headers must match
  strictAndMatching.put("x-match", "all");
  strictAndMatching.put("handle-status", HANDLE_NOW);
  strictAndMatching.put("message-author", RICH_CLIENT);
  headersChannel.queueDeclare(CONSUMER_QUEUE_3, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
          Collections.emptyMap());
  headersChannel.queueBind(CONSUMER_QUEUE_3, EXCHANGE, ALL_ROUTINGS, strictAndMatching);
}

@Test
public void should_not_deliver_message_to_strict_queue_because_1_header_is_not_matching() throws IOException, InterruptedException {
  String message = "Test_Headers_1";
  Map<String, Object> msgHeaders = new HashMap<>();
  msgHeaders.put("handle-status", HANDLE_NOW);
  msgHeaders.put("message-author", "future client");
  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(msgHeaders).build();
  headersChannel.basicPublish(EXCHANGE, ALL_ROUTINGS, properties, message.getBytes());

  CountDownLatch latch = new CountDownLatch(1);

  // declare consumer 
  headersChannel.basicConsume(CONSUMER_QUEUE_3, AUTO_ACK.yes(),  
    new CountingConsumer(headersChannel, latch, STATS));

  latch.await(2, TimeUnit.SECONDS);

  // Because fanout exchange doesn't take into account any additional criteria, two subscribed consumers
  // receive the message
  assertThat(STATS.containsKey(message)).isFalse();
}

@Test
public void should_deliver_message_to_strict_consumer_where_all_headers_are_matching() throws IOException, InterruptedException {
  String message = "Test_Headers_2";
  Map<String, Object> msgHeaders = new HashMap<>();
  msgHeaders.put("handle-status", HANDLE_NOW);
  msgHeaders.put("message-author", RICH_CLIENT);
  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(msgHeaders).build();

  // Message is published with "x" routing key
  // Note that there are no queues bound with this key
  headersChannel.basicPublish(EXCHANGE, "x", properties, message.getBytes());

  CountDownLatch latch = new CountDownLatch(3);

  // declare consumers
  headersChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(),  
    new CountingConsumer(headersChannel, latch, STATS));
  headersChannel.basicConsume(CONSUMER_QUEUE_2, AUTO_ACK.yes(),  
    new CountingConsumer(headersChannel, latch, STATS));
  headersChannel.basicConsume(CONSUMER_QUEUE_3, AUTO_ACK.yes(),  
    new CountingConsumer(headersChannel, latch, STATS));

  latch.await(2, TimeUnit.SECONDS);

  // All 3 consumers receive the message because
  // - 2 of them has x-match=any and 1 x-match=all
  // - there are always at least 1 or all matching headers
  assertThat(STATS.get(message).intValue()).isEqualTo(3);
}

@Test
public void should_deliver_message_where_only_one_header_is_matching() throws IOException, InterruptedException {
  String message = "Test_Headers_3";
  Map<String, Object> msgHeaders = new HashMap<>();
  msgHeaders.put("handle-status", HANDLE_TOMORROW);
  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(msgHeaders).build();
  headersChannel.basicPublish(EXCHANGE, ALL_ROUTINGS, properties, message.getBytes());

  CountDownLatch latch = new CountDownLatch(3);

  // declare consumers
  headersChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), 
    new CountingConsumer(headersChannel, latch, STATS));
  headersChannel.basicConsume(CONSUMER_QUEUE_2, AUTO_ACK.yes(),  
    new CountingConsumer(headersChannel, latch, STATS));
  headersChannel.basicConsume(CONSUMER_QUEUE_3, AUTO_ACK.yes(),  
    new CountingConsumer(headersChannel, latch, STATS));

  latch.await(2, TimeUnit.SECONDS);

  // Only queue#2 should receive the message
  assertThat(STATS.get(message).intValue()).isEqualTo(1);
}

This article, dominated by JUnit cases, illustrates how to work with RabbitMQ exchanges from Java API. The first part shows the case of direct exchange, based on strictly defined routing key. The second part presents more basic kind of exchange - fanout. It dispatches the message to all bound queues, independently of routing key. The next type is topic one. It allows to specify RegEx-like routing keys. At the end we can see the example of headers exchange in which queue identification is based on values defined in message headers.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!