Priorities in RabbitMQ

Sometimes messages are not equal between them. Some are more prioritized than the others and they should be handled first. RabbitMQ implements also a feature to deal with the problems of priorities.

The first part of the article talks about priorities set on messages. The second part describes the ones defined for consumers. Both are illustrated with several JUnit test cases demonstrating theirs features.

Message priorities

Each message can be prioritized through a property called... priority. Message with higher priority (= bigger number) is delivered first. If the priority level is not defined, default value is taken. This default value is equal to 0.

In additional, message priorities are strictly related to queue priorities. Each queue can define its own max priority level (x-max-priority attribute). It specifies priority level that given queue should support.

But what happens if we publish a message with higher priority level than the supported one ? RabbitMQ will re-prioritize down sent value to the maximum supported level. For example, if a queue doesn't support messages with priority higher than 5 and we push a message with priority 6, the priority will be translated back to the maximum supported level (5). Now, if for the same queue we push a message with the priority 9, this value will be also redefined to 5. In the case if there are more than 1 re-prioritized message, the receiving is made according to the publication date: older messages are received first.

Given test cases illustrates messages priorities in RabbitMQ:

private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();

@Before
public void initializeQueues() throws IOException, TimeoutException {
  // ...
  Map<String, Object> params1 = new HashMap<>();
  params1.put("x-max-priority", 5);
  prioritizedChannel.queueDeclare(CONSUMER_QUEUE_MAX_5, DURABLE.not(), 
    EXCLUSIVE.not(), AUTO_DELETE.yes(), params1);
  prioritizedChannel.queueBind(CONSUMER_QUEUE_MAX_5, EXCHANGE, "");

  prioritizedChannel.queueDeclare(CONSUMER_QUEUE_MAX_10, DURABLE.not(), 
    EXCLUSIVE.not(), AUTO_DELETE.not(), null);
  prioritizedChannel.queueBind(CONSUMER_QUEUE_MAX_10, EXCHANGE, "");
}

@Test
public void should_correctly_order_messages_respecting_max_priority() throws IOException {
  String messageContent1 = "Priority_1";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(1), messageContent1.getBytes());
  String messageContent3 = "Priority_3";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(3), messageContent3.getBytes());
  String messageContent2 = "Priority_2";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(2), messageContent2.getBytes());

  String receivedMsg1 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg2 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg3 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());

  // Expected order: "Priority_3", "Priority_1"
  assertThat(receivedMsg1).isEqualTo(messageContent3);
  assertThat(receivedMsg2).isEqualTo(messageContent2);
  assertThat(receivedMsg3).isEqualTo(messageContent1);
}

@Test
public void should_correctly_order_messages_with_higher_priority_than_max_priority() throws IOException {
  String messageContent3 = "Priority_3";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(3), messageContent3.getBytes());
  // 6 is higher than max allowed priority (5); it's re-prioritized down to the max allowed priority value
  String messageContent6 = "Priority_6";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(6), messageContent6.getBytes());
  String messageContent1 = "Priority_1";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(1), messageContent1.getBytes());
  String messageContent7 = "Priority_7";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(7), messageContent7.getBytes());
  String messageContent9 = "Priority_9";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(9), messageContent9.getBytes());
  String messageContent16 = "Priority_16";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(16), messageContent16.getBytes());
  String messageContent17 = "Priority_17";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(17), messageContent17.getBytes());
  String messageContent21 = "Priority_21";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(21), messageContent21.getBytes());
  String messageContent20 = "Priority_20";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(21), messageContent20.getBytes());
  String messageContent22 = "Priority_22";
  prioritizedChannel.basicPublish(EXCHANGE, "", basicProps(22), messageContent22.getBytes());

  String receivedMsg1 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg2 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg3 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg4 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg5 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg6 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg7 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg8 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg9 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());
  String receivedMsg10 = 
    new String(prioritizedChannel.basicGet(CONSUMER_QUEUE_MAX_5, true).getBody());

  // As you could see, we pushed a lot of message with priority higher
  // than queue's max priority (5)
  // In this case, they will be received through publication date.
  // Older messages are got first.
  assertThat(receivedMsg1).isEqualTo(messageContent6);
  assertThat(receivedMsg2).isEqualTo(messageContent7);
  assertThat(receivedMsg3).isEqualTo(messageContent9);
  assertThat(receivedMsg4).isEqualTo(messageContent16);
  assertThat(receivedMsg5).isEqualTo(messageContent17);
  assertThat(receivedMsg6).isEqualTo(messageContent21);
  assertThat(receivedMsg7).isEqualTo(messageContent20);
  assertThat(receivedMsg8).isEqualTo(messageContent22);
  assertThat(receivedMsg9).isEqualTo(messageContent3);
  assertThat(receivedMsg10).isEqualTo(messageContent1);
}

private static AMQP.BasicProperties basicProps(int priority) {
  return new AMQP.BasicProperties
    .Builder()
    .priority(priority)
    .build();
}

Consumers priorities

Another aspect related to priorities in RabbitMQ concerns consumers. As it was the case for message, consumers also can have their own priority level. It's also defined through x-priority attribute. If it's missing, default value equal to 0 is used.

Consumer priorities are useful to control messages dispatching process. When one message is received, RabbitMQ will look first for available consumers having higher priority. If there are no available prioritized consumers, the message is forwarded in usual way.

Nothing better to see consumer priorities than some short test cases:

@Test
public void should_dispatch_messages_always_to_consumer_with_the_highest_priority() throws IOException, InterruptedException {
  CountDownLatch latch = new CountDownLatch(3);
  Map<String, Object> args5 = new HashMap<>();
  // switch to 15 to see that associated consumer will handle all messages
  args5.put("x-priority", 5);
  Map<String, Object> args10 = new HashMap<>();
  args10.put("x-priority", 10);

  Map<String, Integer> stats5 = new HashMap<>();
  Map<String, Integer> stats10 = new HashMap<>();

  prioritizedChannel.basicConsume(CONSUMER_QUEUE_MAX_10, AUTO_ACK.yes(), 
    args5, new CountingConsumer(prioritizedChannel, latch, stats5));
  prioritizedChannel.basicConsume(CONSUMER_QUEUE_MAX_10, AUTO_ACK.yes(),
     args10, new CountingConsumer(prioritizedChannel, latch, stats10));

  prioritizedChannel.basicPublish(EXCHANGE, "", null, "Test_1".getBytes());
  prioritizedChannel.basicPublish(EXCHANGE, "", null, "Test_2".getBytes());
  prioritizedChannel.basicPublish(EXCHANGE, "", null, "Test_3".getBytes());

  latch.await(4, TimeUnit.SECONDS);

  assertThat(stats5).hasSize(0);
  assertThat(stats10).hasSize(3);
}

@Test
public void should_allow_consumer_with_lower_priority_to_handle_messages_when_consumer_having_the_highest_priority_is_blocking() throws IOException, InterruptedException {
  CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE);
  Map<String, Object> args5 = new HashMap<>();
  args5.put("x-priority", 5);
  Map<String, Object> args10 = new HashMap<>();
  args10.put("x-priority", 10);

  Map<String, Integer> stats5 = new HashMap<>();
  Map<String, Integer> stats10 = new HashMap<>();

  /**
    * "Consumer priorities allow you to ensure that high priority 
    * consumers receive messages while they are active, with messages 
    * only going to lower priority consumers when the high priority 
    * consumers block."
    * <a href="https://www.rabbitmq.com/consumer-priority.html" target="_blank">Source</a>
    *
    * So, expected scenario is:
    * 1) There are 2 consumers: first with the priority of 5, the second 
    *    with the priority of 10
    * 2) Because there are only 1 allowed unacknowledged message, 
    *    RabbitMQ should consider consumer unacknowledging messages 
    *    as "blocked".
    * 3) Two remaining published messages, "Test_2" and "Test_3", 
    *    should be delivered to the 2nd consumer, which acknowledges 
    *    messages manually after receiving them.
    */
  PotentiallyBlockingConsumer blockingConsumer = 
    new PotentiallyBlockingConsumer(prioritizedChannel, latch, stats10, true);
  PotentiallyBlockingConsumer notBlockingConsumer = 
    new PotentiallyBlockingConsumer(prioritizedChannel, latch, stats5, false);

  // basicQos(...) - only 1 unacknowledged message can be sent to consumers in this channel
  prioritizedChannel.basicQos(1);
  prioritizedChannel.basicConsume(CONSUMER_QUEUE_MAX_10, AUTO_ACK.not(), 
    args5, notBlockingConsumer);
  prioritizedChannel.basicConsume(CONSUMER_QUEUE_MAX_10, AUTO_ACK.not(),
    args10, blockingConsumer);

  prioritizedChannel.basicPublish(EXCHANGE, "", null, "Test_1".getBytes());
  prioritizedChannel.basicPublish(EXCHANGE, "", null, "Test_2".getBytes());
  prioritizedChannel.basicPublish(EXCHANGE, "", null, "Test_3".getBytes());

  latch.await(3, TimeUnit.SECONDS);

  assertThat(stats5).hasSize(2);
  assertThat(stats5.values()).containsOnly(1);
  assertThat(stats5.keySet()).containsOnly("Test_2", "Test_3");
  assertThat(stats10).hasSize(0);
  assertThat(blockingConsumer.getBlockingMessages()).hasSize(1);
  assertThat(blockingConsumer.getBlockingMessages()).containsOnly("Test_1");
}

private static AMQP.BasicProperties basicProps(int priority) {
  return new AMQP.BasicProperties
    .Builder()
    .priority(priority)
    .build();
}

private static class PotentiallyBlockingConsumer extends DefaultConsumer {

  private final Map<String, Integer> counter;
  private final CountDownLatch latch;
  private final boolean isBlocking;
  private Set<String> blockingMessages = new HashSet<>();

  public PotentiallyBlockingConsumer(Channel channel, 
        CountDownLatch latch, Map<String, Integer> counter, boolean isBlocking) {
    super(channel);
    this.counter = counter;
    this.isBlocking = isBlocking;
    this.latch = latch;
  }

  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                             byte[] body) throws IOException {
    if (isBlocking) {
      blockingMessages.add(new String(body, "UTF-8"));
      // message is rejected; But thanks to "true" flag, RabbitMQ tries to redeliver it, in occurrence to the same
      // consumer as the one which rejected the message
      getChannel().basicReject(envelope.getDeliveryTag(), true);
    } else {
      String message = new String(body, "UTF-8");
      Integer times = counter.get(message);
      if (times == null ) {
        times = 0;
      }
      counter.put(message, times+1);
      getChannel().basicAck(envelope.getDeliveryTag(), true);
    }
    latch.countDown();
  }

  private Set<String> getBlockingMessages() {
    return blockingMessages;
  }
}

This article shows how two different types of priorities can be applied. First, we can learn that priorities on messages help to define the order of receiving messages. After, we can see that priorities can be applied to consumers too. In this case, consumers with higher priorities are supposed to handle messages before the consumers with lower priorities.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!