Sometimes consumers can be overloaded by the number of messages to handle. RabbitMQ helps to control this number with the parameter called prefetch.
What would it take for you to trust your Databricks pipelines in production?
A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.
Konieczny
This article focuses on prefetch aspect. The first part describes its general idea. The second part shows how to manipulate this prefetch parameter from Java API.
RabbitMQ and prefetch
When prefetch parameter (qos more precisely) is not defined, the quantity of messages consumed by channel consumers is limited by consumer's RAM. As long as RabbitMQ receives new messages, it dispatches them to consumers. If consumers don't have any time to work on messages, the messages remain unacknowledged.
The solution for consumers growing work rate consists on configure qos prefetch property. By doing so, we specify how many messages can remain unacknowledged by given consumer. Suppose that we set qos value to 3. It means that consumer will store only 3 messages at once. The rest will remain in the queue.
Prefetch can be applied globally or locally. For the second case, the limit applies for each consumer separately. For example, if qos is equal to 3, each consumer will be able to receive only 3 unacknowledged messages. In the case of global declaration, it's a channel which has limited capacity to store messages. For qos=3 and global configuration, only 3 messages will be sent to channel.
Configure prefetch from Java API
To see prefetch in action, let's take an example of some simple JUnit test cases:
private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();
@Before
public void initializeQueues() throws IOException, TimeoutException {
// ...
directChannel.queueDeclare(CONSUMER_QUEUE_1, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
Collections.emptyMap());
directChannel.queueBind(CONSUMER_QUEUE_1, EXCHANGE, DIRECT_EXC_KEY);
}
@Test
public void should_not_accept_more_messages_than_specified_in_prefetch_count_for_a_single_consumer() throws IOException, InterruptedException {
String message = "Test_1";
// max 3 unacknowledged messages per consumer
directChannel.basicQos(3, GLOBAL_CHANNEL_PREFETCH.not());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());
CountDownLatch latch = new CountDownLatch(8);
// declare consumer
directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));
latch.await(3, TimeUnit.SECONDS);
// we allow RabbitMQ to dispatch only 3 messages; and because
// there are no more than 3 messages acknowledged, given consumer can't
// consume then more
// To see that all 8 messages are normally consumed, simply comment basicQos(...) call
assertThat(STATS.get(message).intValue()).isEqualTo(3);
}
@Test
public void should_block_one_consumer_but_continue_to_dispatch_to_another_one() throws IOException, InterruptedException {
// This time we try to discover the behaviour when 2 consumers
// are subscribed to a queue and both can consume
// only 3 messages without acknowledging them
directChannel.basicQos(3, GLOBAL_CHANNEL_PREFETCH.not());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("1").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("2").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("3").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("4").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("5").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("6").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("7").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("8").getBytes());
CountDownLatch latch = new CountDownLatch(8);
// declare consumers
directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));
directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));
latch.await(3, TimeUnit.SECONDS);
// we allow RabbitMQ to dispatch only 3 messages per consumer.
// Because there are 2 available consumers, first 6 messages should be consumed
// Once again, comment basicQos(...) to see test fail
assertThat(STATS.get(testMessage("1")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("2")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("3")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("4")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("5")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("6")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("7"))).isNull();
assertThat(STATS.get(testMessage("8"))).isNull();
}
@Test
public void should_apply_prefetch_globally_to_given_channel() throws IOException, InterruptedException {
// In this test, the prefetch is applied globally to a channel.
// So even if we have 2 workers, each accepting 3 messages,
// the next messages won't be dispatched to not busy workers
directChannel.basicQos(3, GLOBAL_CHANNEL_PREFETCH.yes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("1").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("2").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("3").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("4").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("5").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("6").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("7").getBytes());
directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, testMessage("8").getBytes());
CountDownLatch latch = new CountDownLatch(8);
// declare consumers; potentially 6 messages consumed
// but only when prefetch is not global for whole channel
directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));
directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.not(), new CountingConsumer(directChannel, latch, STATS));
latch.await(3, TimeUnit.SECONDS);
// we allow RabbitMQ to dispatch only 3 messages per consumer.
// Because there are 2 available consumers, first 6 messages
// should be consumed
// Once again, comment basicQos(...) to see test fail
assertThat(STATS.get(testMessage("1")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("2")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("3")).intValue()).isEqualTo(1);
assertThat(STATS.get(testMessage("4"))).isNull();
assertThat(STATS.get(testMessage("5"))).isNull();
assertThat(STATS.get(testMessage("6"))).isNull();
assertThat(STATS.get(testMessage("7"))).isNull();
assertThat(STATS.get(testMessage("8"))).isNull();
}
private String testMessage(String number) {
return "TestMsg_"+number;
}
The article shows how to tune channel and consumers capacities to handle messages. We can limit the number of unacknowledged messages by simply setting qos parameter. By doing so, the queue will store the messages and send them only when consumers or channel have some space available.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems contact@waitingforcode.com đź“©
