Reply queue as Remote Procedure Call pattern in RabbitMQ

RabbitMQ is not only one-direction system. Queues can communicate between them, just as two people talk together.

In this article we describe how to implement Remote Procedure Call (RPC) pattern in RabbitMQ. At the beginning, we define reply queue idea. Even if the comparison with human conversation is meaningful, it's not so bad to know some specific terms related to this pattern. In the other hand, the second part shows a small test case illustrating how RPC works.

What is reply queue ?

The magic of conversation between two or more RabbitMQ nodes happens thanks to reply_to header value defined in message properties. In indicates the queue to which message consumer must publish response message. Sample flow could look like:

  1. Publisher from "publisher_queue" sends a message containing reply_to="publisher_queue" header to queue "consumer_queue".
  2. Consumer from "consumer_queue" receives the message. Because there are some information about response, it takes the value of reply_to and sends response to indicated queue.
  3. Publisher receives response from "consumer_queue". If the response contains a header reply_to, the publisher can send new message to consumer (and at the same it begins a consumer).

But reply_to is not a single important property in RabbitMQ RPC calls. Another one is called correlation_id and it helps to assembly successive requests and responses. With this property either consumer or publisher can persist important "conversations" somewhere (for example in database).

To resume the definition, RPC is nothing more, nothing less, than a model in which are two actors: client and server. One of them must publish messages and the second must consume them. The roles aren't assigned strictly and a publisher can be as well a consumer, as consumer can be as well a publisher.

Reply queue example in RabbitMQ

To see RPC implemented in RabbitMQ, you can take a look at following test class. It's simplified version of RPC where publisher sends a message and consumer replies to it only once:

public class ReplyQueueTest extends BaseConfig {

  private static final String CLOSING_MESSAGE = "X";
  private static final String EXCHANGE = "Reply_Queue_Test";
  private static final String RECEIVER_QUEUE = "RPC_Receiver";
  private static final String SENDER_QUEUE = "RPC_Sender";
  private static final String REPLY_EXC_KEY = "Key_1";
  private static final Map<String, Integer> STATS = new ConcurrentHashMap<>();

  private Connection connection;

  private Channel receiverChannel;

  private Channel senderChannel;

  @Before
  public void initializeQueues() throws IOException, TimeoutException {
    connection = getConnection();
    receiverChannel = connection.createChannel();
    receiverChannel.exchangeDeclare(EXCHANGE, ExchangeTypes.DIRECT.getName());
    senderChannel = connection.createChannel();

    receiverChannel.queueDeclare(RECEIVER_QUEUE, DURABLE.not(), EXCLUSIVE.not(), AUTO_DELETE.yes(),
        Collections.emptyMap());
    receiverChannel.basicQos(0); // maximum number of messages that server will deliver, 0 = unlimited
    receiverChannel.queueBind(RECEIVER_QUEUE, EXCHANGE, REPLY_EXC_KEY);

    senderChannel.queueDeclare(SENDER_QUEUE, DURABLE.not(), EXCLUSIVE.yes(), AUTO_DELETE.yes(),
        Collections.emptyMap());
  }

  @After
  public void clearQueues() throws IOException, TimeoutException {
    STATS.clear();

    /**
      * Before deleting exchange, we send a message indicating 
      * that the {@code QueueingConsumer}s should stop to wait 
      * for next deliveries. Without that, an 
      * {@code ConsumerCancelledException} is thrown.
      */
    senderChannel.basicPublish("", RECEIVER_QUEUE, null, 
      CLOSING_MESSAGE.getBytes("UTF-8"));

    receiverChannel.queueDelete(RECEIVER_QUEUE);
    senderChannel.queueDelete(SENDER_QUEUE);
    receiverChannel.exchangeDelete(EXCHANGE);
    senderChannel.exchangeDelete(EXCHANGE);

    receiverChannel.close();
    senderChannel.close();
    connection.close();
  }

  @Test
  public void should_consume_and_reply_the_message() throws IOException, InterruptedException {
    String message = "Hello world";
    CountDownLatch latch = new CountDownLatch(2);

    new Thread(new ReceiverListener(receiverChannel, latch)).start();
    new Thread(new SenderListener(senderChannel, latch, message)).start();

    latch.await(2, TimeUnit.SECONDS);

    assertThat(STATS.get(message).intValue()).isEqualTo(1);
    assertThat(STATS.get(constructResponse(message)).intValue()).isEqualTo(1);
  }

  private static class SenderListener implements Runnable {

    private final QueueingConsumer consumer;
    private final CountDownLatch latch;
    private final String message;

    private SenderListener(Channel channel, CountDownLatch latch, String message) {
        this.consumer = new QueueingConsumer(channel);
        try {
            channel.basicConsume("", AUTO_ACK.yes(), consumer);
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.latch = latch;
        this.message = message;
    }

    @Override
    public void run() {
      Channel senderChannel = consumer.getChannel();

      try {
        String corrId = UUID.randomUUID().toString();
        AMQP.BasicProperties messageProps = new AMQP.BasicProperties
          .Builder()
          .correlationId(corrId)
          .replyTo(SENDER_QUEUE)
          .build();
        System.out.println("["+ SENDER_QUEUE +"] I'm sending new message "+message + 
          " with correlation id "+corrId);
        senderChannel.basicPublish("", RECEIVER_QUEUE, 
          messageProps, message.getBytes("UTF-8"));

        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String receivedMessage = new String(delivery.getBody(),"UTF-8");
        System.out.println("["+ SENDER_QUEUE +"] Got response from ["+RECEIVER_QUEUE+"] : "+
          receivedMessage);
        STATS.put(receivedMessage, 1);
      } catch (IOException|InterruptedException e) {
        e.printStackTrace();
      }

      latch.countDown();
    }
  }

  private static class ReceiverListener implements Runnable {

    private final QueueingConsumer consumer;
    private final CountDownLatch latch;

    private ReceiverListener(Channel channel, CountDownLatch latch)  {
      this.consumer = new QueueingConsumer(channel);
      try {
        channel.basicConsume(RECEIVER_QUEUE, AUTO_ACK.not(), consumer);
      } catch (IOException e) {
        e.printStackTrace();
      }
      this.latch = latch;
    }

    @Override
    public void run() {
      Channel receiverChannel = consumer.getChannel();
      try {
        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          AMQP.BasicProperties messageProps = delivery.getProperties();
          String message = new String(delivery.getBody(),"UTF-8");
          if (message.equals(CLOSING_MESSAGE)) {
            break;
          }
          STATS.put(message, 1);
          String response = constructResponse(message);
          System.out.println("[" + RECEIVER_QUEUE + "] Receive message " + message + 
            " with correlationId " + messageProps.getCorrelationId());
          System.out.println("[" + RECEIVER_QUEUE + "] Publishing new message " + 
            response);
          receiverChannel.basicPublish("", messageProps.getReplyTo(), messageProps, 
            response.getBytes("UTF-8"));
          receiverChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

      } catch (IOException|InterruptedException e) {
        e.printStackTrace();
      }
      latch.countDown();
    }
  }

  private static String constructResponse(String message) {
      return "Re: "+message;
  }

}

RPC pattern introduces the concept of a kind of interactive communication between consumer and publisher. This exchange is possible thanks to reply_to and correlation_id properties. The first one helps to determine to which queue the response should be sent while the second indicates the id of exchange concerned by the message.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!