Flags in RabbitMQ

RabbitMQ Java API brings a lot of methods called the same but accepting different parameters. Very often, these parameters are flags which can apply as well on messages, as on exchanges level.

In this article we try to explain the most interesting flags. Globally, the list is the same as in AMQP 0.9.1 reference. It only takes Java code from RabbitMQ client, version 3.5.4, to list these options.

Flags in RabbitMQ - publishing

This part describes one flag used with message publication. It is mandatory. This flag tells RabbitMQ what should happen if published message can't reach any of queues (because of for example bad queue name). When message is published and can't be routed to a queue, it's returned to the sender through basic.return. This event can be caught thanks to ReturnListener. Below, you can find an example of the implementation of this flag:

@Test
public void should_return_message_with_mandatory_flag_sent_to_not_existing_queue() throws IOException, InterruptedException {
  final CountDownLatch latch = new CountDownLatch(1);
  boolean[] wasReturned = new boolean[] {false};
  directChannel.addReturnListener(
    (replyCode, replyText,exchange, routingKey, properties, body) -> {
      wasReturned[0] = true;
      latch.countDown();
  });
  directChannel.basicPublish(EXCHANGE, "Routing_no_queue", MANDATORY.yes(), null, "Test".getBytes());
  latch.await(3, TimeUnit.SECONDS);

  assertThat(wasReturned[0]).isTrue();
}

@Test
public void should_return_message_to_listener_and_reroute_it_manually() throws IOException, InterruptedException {
  final CountDownLatch latch = new CountDownLatch(1);
  boolean[] wasReturned = new boolean[] {false};
  directChannel.addReturnListener(
    (replyCode, replyText,exchange, routingKey, properties, body) -> {
      wasReturned[0] = true;
      directChannel.basicPublish(exchange, DIRECT_EXC_KEY, properties, body);
      latch.countDown();
  });
  directChannel.basicPublish(EXCHANGE, "Routing_no_queue", MANDATORY.yes(), null, "Test".getBytes());
  latch.await(3, TimeUnit.SECONDS);

  GetResponse msgResponse = directChannel.basicGet(CONSUMER_QUEUE_1, AUTO_ACK.yes());

  assertThat(wasReturned[0]).isTrue();
  assertThat(msgResponse).isNotNull();
  assertThat(new String(msgResponse.getBody())).isEqualTo("Test");
}

Apparently, there are another flag settable when a message is published - immediate. However, it's not supported by RabbitMQ server and each call will fail with IOException:

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method:    
  #method(reply-code=540, 
    reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)

Flags in RabbitMQ - declarations

Another flags appear in exchanges and queues declarations. As we have seen that in the article about Key concepts of RabbitMQ, exchange and queue can be declared as durable. It simply means that they will survive the crash and not lose the messages still not consumed.

Both elements, exchanges and queues, also accept the flag called autoDelete. If this flag is set, the queue or exchange, which is not in use anymore, is automatically deleted. Another common flag is the one about passivity. It indicates if given element already exists with specified name. If the element exists, the method returns DeclareOk. Otherwise, an error is thrown.

In the other side, only queues support the flag called exclusive. The flag indicates that given queue can be accessed only from the current connection. But exchanges also have their own property. It's called internal. This flag marks given exchange as it was a private field of a class. Internal exchanges can't be manipulated directly by publishers - publishers can't publish new messages directly. So, this exchange can be only used when it's bound to other exchanges.

There are also some other flags, as the ones used for queue suppression. We can tell RabbitMQ to delete a queue only, and only if, it's unused and empty.

There are few examples of listed flags:

@Test
public void should_prevent_against_removing_not_empty_and_used_queue() throws IOException, InterruptedException {
  directChannel.basicPublish(EXCHANGE, CONSUMER_QUEUE_1, MANDATORY.yes(), null, "Test".getBytes());
  Thread sleepingConsumer = new Thread(() -> {
    try {
      directChannel.basicConsume(CONSUMER_QUEUE_1, new DefaultConsumer(directChannel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                    byte[] body) throws IOException {
          try {
            // simulates queue "still used"
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      });
    } catch (IOException e) {
        e.printStackTrace();
    }
  });
  sleepingConsumer.run();
  Thread.sleep(2000);

  try {
    directChannel.queueDelete(CONSUMER_QUEUE_1, IF_UNUSED.yes(), IF_EMPTY.yes());
    fail("Should detect queue as being 'still in use'");
  } catch (IOException io) {
    ShutdownSignalException cause = (ShutdownSignalException) io.getCause();
    assertThat(cause.getReason().toString()).contains("PRECONDITION_FAILED - queue 'Consumer_1' in vhost '/' in use");
    System.out.println("=> "+cause.getReason());
  }
}

@Test
public void should_declare_auto_deletable_queue() throws IOException, InterruptedException {
  String queueName = "Auto_Delete_Queue";
  directChannel.queueDeclare(queueName, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(), null);
  String consumerTag = directChannel.basicConsume(queueName, new DefaultConsumer(directChannel));

  AMQP.Queue.DeclareOk declareOk = directChannel.queueDeclarePassive(queueName);
  assertThat(declareOk.getQueue()).isEqualTo(queueName);

  // We simulate "queue not in use" by removing its consumer
  Thread.sleep(1000);
  directChannel.basicCancel(consumerTag);

  // Now queue passive declaration should fail because of
  // not existent queue
  // By the way, it also illustrates the behaviour of
  // queue passive declaration when the queue exists and not
  Thread.sleep(1000);
  try {
    declareOk = directChannel.queueDeclarePassive(queueName);
    System.out.println("=> "+declareOk.getQueue());
    fail("Should fail on declaring not existent queue passively");
  } catch (IOException ioe) {
    ShutdownSignalException cause = (ShutdownSignalException) ioe.getCause();
    assertThat(cause.toString()).contains("NOT_FOUND - no queue 'Auto_Delete_Queue' in vhost '/'");
  }
}

@Test
public void should_not_access_exclusive_queue_from_other_connection() throws IOException, TimeoutException {
  String queueName = "Exclusive_Queue";
  directChannel.queueDeclare(queueName, DURABLE.not(), EXCLUSIVE.yes(), AUTO_DELETE.yes(), null);

  // Create new connection to see if it can access exclusive queue
  Connection newConnection = CONNECTION_FACTORY.newConnection();
  Channel newChannel = newConnection.createChannel();
  try {
    newChannel.basicConsume(queueName, new DefaultConsumer(newChannel));
    fail("Should not be able to access to exclusive queue");
  } catch (IOException ioe) {
    ShutdownSignalException cause = (ShutdownSignalException) ioe.getCause();
    assertThat(cause.toString()).contains("RESOURCE_LOCKED - cannot obtain exclusive access to " +
            "locked queue 'Exclusive_Queue' in vhost");
  }
}

@Test
public void should_not_be_able_to_publish_on_internal_exchange() throws IOException {
  String exchangeName = "Internal_Exchange";
  String queueName = "Internal_Queue";
  directChannel.exchangeDeclare(exchangeName, ExchangeTypes.DIRECT.getName(), DURABLE.not(), AUTO_DELETE.yes(),
    INTERNAL.yes(), null);
  directChannel.queueDeclare(queueName, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
    Collections.emptyMap());
  directChannel.queueBind(queueName, exchangeName, "x");
  try {
    directChannel.basicPublish(exchangeName, "", null, "Test".getBytes());
    directChannel.basicGet(queueName, true);
    fail("Should not be able to publish on internal queue");
  } catch (IOException ioe) {
    ShutdownSignalException cause = (ShutdownSignalException) ioe.getCause();
    assertThat(cause.toString()).contains("ACCESS_REFUSED - cannot publish " +
            "to internal exchange 'Internal_Exchange' in vhost '/'");
  }
}

Flags in RabbitMQ - messages reading

Messages consuming can also be flagged with different values. Messages acknowledging or not acknowledging can be flagged with multiple flag. When set, multiple messages can be (not) acknowledged within a single call. It means automatically that one delivery tag relies on several different messages. In the other side, when not set, consuming method applies only to one specific message.

When a message is rejected or not acknowledged, it can be put again to the queue thanks to flag requeue. Without this flag, rejected message is directly discarded or moved to dead-letter queue.

Also consuming, declared through basicConsume(...) set of methods, can be configured with different flags. The first one concerns auto-acknowledging. When used, the consumer doesn't need to make ack manually. The second flag allows consumer to refuse messages published on the same channel's connection as publisher's one. However, this flag is not implemented according to RabbitMQ specification. But it's still present in Java API. Another option is the possibility to declare a consumer as exclusive. It means that only exclusive consumer can access to subscribed queue.

This case shows some flags used in basicConsumer(...) method:

@Test
public void should_deliver_the_message_to_only_exclusive_consumer_even_if_another_one_is_declared() throws IOException, InterruptedException {
  String message = "Test_Exclusive_Consumer";
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, message.getBytes());

  CountDownLatch latch = new CountDownLatch(2);

  // declare consumers - note that we declare two consumers to one queue. It shouldn't made the message
  // being received twice.
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), "consumer_1",
    NO_LOCAl.not(), EXCLUSIVE.yes(), null, new CountingConsumer(directChannel, latch, STATS));
  try {
    // declaration of new consumer if there are already an exclusive consumer
    // should fail
    directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), "consumer_2",
      NO_LOCAl.not(), EXCLUSIVE.not(), null, new CountingConsumer(directChannel, latch, STATS));
    fail("Should not be able to declare consumer if there are already one exclusive consumer");
  }  catch (IOException ioe) {
    ShutdownSignalException cause = (ShutdownSignalException) ioe.getCause();
    assertThat(cause.toString()).contains("ACCESS_REFUSED - queue 'Consumer_1' in vhost '/' in exclusive use");
  }
  latch.await(2, TimeUnit.SECONDS);

  assertThat(STATS.get(message).intValue()).isEqualTo(1);
}

@Test
public void should_accept_messages_for_no_local_consumer_and_shared_connection_with_publisher() throws IOException, InterruptedException, TimeoutException {
  // According to RabbitMQ specification (https://www.rabbitmq.com/specification.html),
  // no-local is not implemented:
  // "The no-local parameter is not implemented. The value
  // of this parameter is ignored and no attempt is made
  // to prevent a consumer from receiving messages
  // that were published on the same connection."
  // So following test will pass
  CountDownLatch latch = new CountDownLatch(1);
  directChannel.basicConsume(CONSUMER_QUEUE_1, AUTO_ACK.yes(), "consumer_2",
    NO_LOCAl.yes(), EXCLUSIVE.not(), null, new CountingConsumer(directChannel, latch, STATS));
  directChannel.basicPublish(EXCHANGE, DIRECT_EXC_KEY, null, "Test".getBytes());

  latch.await(2, TimeUnit.SECONDS);

  assertThat(STATS).hasSize(1);
}

The article describes some of flags which can be placed on messages, exchanges or queues. We can see that we can handle queues and exchanges exclusivity and passivity thanks to exclusive and internal flags. Exclusivity can be also managed for consumers. Another described flag concerns messages. Thanks to it (mandatory), we can manage mandatory routing.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!