Prefetch in RabbitMQ

Sometimes consumers can be overloaded by the number of messages to handle. RabbitMQ helps to control this number with the parameter called prefetch.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

This article focuses on prefetch aspect. The first part describes its general idea. The second part shows how to manipulate this prefetch parameter from Java API.

RabbitMQ and prefetch

When prefetch parameter (qos more precisely) is not defined, the quantity of messages consumed by channel consumers is limited by consumer's RAM. As long as RabbitMQ receives new messages, it dispatches them to consumers. If consumers don't have any time to work on messages, the messages remain unacknowledged.

The solution for consumers growing work rate consists on configure qos prefetch property. By doing so, we specify how many messages can remain unacknowledged by given consumer. Suppose that we set qos value to 3. It means that consumer will store only 3 messages at once. The rest will remain in the queue.

Prefetch can be applied globally or locally. For the second case, the limit applies for each consumer separately. For example, if qos is equal to 3, each consumer will be able to receive only 3 unacknowledged messages. In the case of global declaration, it's a channel which has limited capacity to store messages. For qos=3 and global configuration, only 3 messages will be sent to channel.

Configure prefetch from Java API

To see prefetch in action, let's take an example of some simple JUnit test cases:

private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();

@Before
public void initializeQueues() throws IOException, TimeoutException {
  // ...
  directChannel.queueDeclare(CONSUMER_QUEUE_1, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
          Collections.emptyMap());
  directChannel.queueBind(CONSUMER_QUEUE_1, EXCHANGE, DIRECT_EXC_KEY);
}

@Test
public void should_not_accept_more_messages_than_specified_in_prefetch_count_for_a_single_consumer() throws IOException, InterruptedException {
  String message = "Test_1";
  // max 3 unacknowledged messages per consumer
  directChannel.basicQos(3, GLOBAL_CHANNEL_PREFETCH.not());

  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());

  CountDownLatch latch = new CountDownLatch(8);

  // declare consumer
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));

  latch.await(3, TimeUnit.SECONDS);

  // we allow RabbitMQ to dispatch only 3 messages; and because
  // there are no more than 3 messages acknowledged, given consumer can't
  // consume then more
  // To see that all 8 messages are normally consumed, simply comment basicQos(...) call
  assertThat(STATS.get(message).intValue()).isEqualTo(3);
}

@Test
public void should_block_one_consumer_but_continue_to_dispatch_to_another_one() throws IOException, InterruptedException {
  // This time we try to discover the behaviour when 2 consumers
  // are subscribed to a queue and both can consume
  // only 3 messages without acknowledging them
  directChannel.basicQos(3, GLOBAL_CHANNEL_PREFETCH.not());

  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("1").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("2").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("3").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("4").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("5").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("6").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("7").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("8").getBytes());

  CountDownLatch latch = new CountDownLatch(8);

  // declare consumers
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));

  latch.await(3, TimeUnit.SECONDS);

  // we allow RabbitMQ to dispatch only 3 messages per consumer.
  // Because there are 2 available consumers, first 6 messages should be consumed
  // Once again, comment basicQos(...) to see test fail
  assertThat(STATS.get(testMessage("1")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("2")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("3")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("4")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("5")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("6")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("7"))).isNull();
  assertThat(STATS.get(testMessage("8"))).isNull();
}

@Test
public void should_apply_prefetch_globally_to_given_channel() throws IOException, InterruptedException {
  // In this test, the prefetch is applied globally to a channel.
  // So even if we have 2 workers, each accepting 3 messages,
  // the next messages won't be dispatched to not busy workers
  directChannel.basicQos(3, GLOBAL_CHANNEL_PREFETCH.yes());

  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("1").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("2").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("3").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("4").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("5").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("6").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("7").getBytes());
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("8").getBytes());

  CountDownLatch latch = new CountDownLatch(8);

  // declare consumers; potentially 6 messages consumed 
  // but only when prefetch is not global for whole channel
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));

  latch.await(3, TimeUnit.SECONDS);

  // we allow RabbitMQ to dispatch only 3 messages per consumer. 
  // Because there are 2 available consumers, first 6 messages
  // should be consumed
  // Once again, comment basicQos(...) to see test fail
  assertThat(STATS.get(testMessage("1")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("2")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("3")).intValue()).isEqualTo(1);
  assertThat(STATS.get(testMessage("4"))).isNull();
  assertThat(STATS.get(testMessage("5"))).isNull();
  assertThat(STATS.get(testMessage("6"))).isNull();
  assertThat(STATS.get(testMessage("7"))).isNull();
  assertThat(STATS.get(testMessage("8"))).isNull();
}

private String testMessage(String number) {
  return "TestMsg_"+number;
}


The article shows how to tune channel and consumers capacities to handle messages. We can limit the number of unacknowledged messages by simply setting qos parameter. By doing so, the queue will store the messages and send them only when consumers or channel have some space available.


If you liked it, you should read:

πŸ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!