Rendezvous channel and message channel interceptors

Until now we're working with non-blocking message channels. However, Spring Integration provides an implementation for blocking message channels too. In additionally, it defines also the hooks for message handling.

In this article we'll see what is hidden under these two concepts. The first part'll cover the idea of blocking message channel. As example to illustrate that we'll use RendezvousChannel. After this part we'll jump to the world of message interceptors. We'll terminate by writing some test cases to show described concepts.

Rendezvous message channel

Before we'll talk about RendezvousChannel, we need to explain the meaning of blocking message channel. Blocking message channels means that each sent message must be received directly after to enable new message's sending. Humanlny speaking, it means that if blocking message channel sends one message, it can't send another message until consuming the first message. Spring Integration treats this feature as belonging fo message channel but elsewhere you can also meet the formulation of Rendezvous transport layer to describe this kind of message channel.

Spring Integration defines blocking message channel as RendezvousChannel. As PriorityChannel, RendezvousChannel also extends directly QueueChannel class. However, the only small difference between its parent is the type of queue storing messages. RendezvousChannel uses java.util.concurrent.SynchronousQueue for that. In this kind of blocking queue we can insert new element only when the queue is empty. So, you can add new element only when the queue is initialized or only when previously added element was consumed. You can simply deduce that SynchronousQueue makes all job necessary to make working RendezvousChannel.

Message channel interceptors

In message-oriented architecture we can also define interceptors for sent messages. In Spring Integration, these interceptors can be invoked in different moments of message lifecycle: before being send or after sending by input channel, before or after being received by output channel. Message interceptors must be defined as implementations of org.springframework.messaging.support.ChannelInterceptor. This interface defines 4 methods, one by situations described previously: preSend, postSend, preReceive and postReceive.

Implemented interceptor can be added to message channels by invoking void addInterceptor(ChannelInterceptor interceptor) method of concerned MessageChannel or by specyfing interceptors directly in configuration file with <interceptors /> element. Sample place of interceptors executing is org.springframework.integration.channel.AbstractMessageChannel class, implemented by both pollable (AbstractPollableChannel) and subscribable (AbstractSubscribableChannel) channels. As we can deduce, interceptors are called before and after method really sending or receiving given message:

// snippet for sending from AbstractMessageChannel
message = this.interceptors.preSend(message, this);
if (message == null) {
  return false;
}
boolean sent = this.doSend(message, timeout);
this.interceptors.postSend(message, this, sent);

// and for receiving from AbstractPollableChannel
if (!this.getInterceptors().preReceive(this)) {
  return null;
}
Message<?> message = this.doReceive(timeout);
message = this.getInterceptors().postReceive(message, this);

As you can see, interceptors influence the fact if message is appropriated to be sent (if null is returned by preSend interceptor, message is not sent) or received correctly (false on preReceive will produce a receiving error).

Spring Integration knows also implement entreprise integration pattern called Wire Trap. This is an interceptor that grab each message sent by given channel and pass it into output channel. Intercepted message is not modified. It's simply grabbed. This kind of interceptors can be used to monitor messages passing in channels and, for example, save some information into log files.

Example of Rendezvous channel and message interceptors

In this part we'll discover RendezvousChannel and interceptors from practice part. We'll begin by configuring both elements:

<context:annotation-config />
<context:component-scan base-package="com.waitingforcode"/>

<int:channel id="rvChannel">
  <int:rendezvous-queue />
  <int:interceptors>
    <int:ref bean="rendezvousInterceptor" />
  </int:interceptors>
</int:channel>

It's pretty simple. Contraty to previously saw configurations, we define here only one channel. This channel will produce messages in one thread and consume them in another one. We could see it in below test case, preceded by interceptor's definition:

/**
 * Sample channel interceptor which changes priority level when payload's level is equal to 0.
 *
 * @author Bartosz Konieczny
 */
@Component
public class RendezvousInterceptor implements ChannelInterceptor {
  private static Random RANDOM_MAKER = new Random();

  @Override
  public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
    if (canApply(message)) {
      Product product = (Product) message.getPayload();
      if (product.canApplyPriorityLevel()) {
        product.setPriorityLevel(RANDOM_MAKER.nextInt(100)+1); // +1 because it starts from 0
      }
    }
    return message;
  }

  @Override
  public void postSend(Message<?> message, MessageChannel messageChannel, boolean sent) {
    if (!sent) {
      // NOTE : for monitoring purposes we should prefer something more persistent as LOGGER, but for simple
      // test cases it's simpler (less verbose, less time-taking) to work with System's prints
      System.out.println("Message ("+message+") was not sent correctly to message channel "+messageChannel);
    }
  }

  @Override
  public boolean preReceive(MessageChannel messageChannel) {
    // do nothing, consider receiving as valid
    return true;
  }

  @Override
  public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
    // do nothing too
    return message;
  }

  private boolean canApply(Message<?> message) {
    return message.getPayload() instanceof Product;
  }

}

/**
 * Test cases to illustrate {@link RendezvousChannel} specificity.
 *
 * @author Bartosz Konieczny
 */
@ContextConfiguration(locations = "classpath:META-INF/rendezvous-channel.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class RendezvousChannelTest {

  @Autowired
  @Qualifier("rvChannel")
  private RendezvousChannel rvChannel;

  @Test
  public void testSimpleSend() throws InterruptedException {
    rvChannel.clear();
    final boolean[] messagesSendState = new boolean[]{false, false}; // both messages haven't been send yet
    final CountDownLatch latch = new CountDownLatch(1);
    final Message<Product> milkMsg = constructMessage("milk");
    final Message<Product> coffeeMsg = constructMessage("coffee");
    new Thread(new Runnable() {
      @Override
      public void run() {
        messagesSendState[0] = rvChannel.send(milkMsg);
        messagesSendState[1] = rvChannel.send(coffeeMsg, 500);
        latch.countDown();
      }
    }).start();
    Message<?> receivedMsg = rvChannel.receive(1000);
    assertTrue("The first message (milk message) should be consumed because of immediate receive() call",
      messagesSendState[0]);
    assertFalse("The second message (coffee message) shouldn't be send because the first one was consumed under 3 seconds " +
      "(for 0.5 second sending timeout)", messagesSendState[1]);
    assertEquals("Received message should be the same as sent", milkMsg, receivedMsg);

    latch.await(4000, TimeUnit.SECONDS);

    // We check also if channel interceptor behaves correctly (enriches message's payload with random priority level when
    // passed level is equal to 0)
    assertNotEquals("milkMsg should have changed priority level", milkMsg.getPayload().getPriorityLevel(), 0);
    assertNotEquals("coffeeMsg should have changed priority level", coffeeMsg.getPayload().getPriorityLevel(), 0); 
  }

  @Test
  public void testSendWithBlock() {
    rvChannel.clear();
    Message<Product> milkMsg = constructMessage("milk");
    long timeout = System.currentTimeMillis()+5000;
    boolean wasSent = false;
    // Test while 5 seconds if message can be sent. It couldn't be because of {@link java.util.concurrent.SynchronousQueue}
    // specificity to accept new elements only when the old one is consumed. Here we try only to push new element and not
    // consume, so the queue is still full. It behaves like 1-sized queue.
    while (timeout > System.currentTimeMillis() && !wasSent) {
      wasSent = rvChannel.send(milkMsg, 500);
    }
    assertFalse("Message shouldn't be sent", wasSent);
  }

  @Test
  public void testProduceConsumerScenario() throws InterruptedException {
    // Test RendezvousChannel in correct, producer-consumer scenario, where every sent message is received directly after
    // by consumer
    rvChannel.clear();
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final String[] products = new String[] {"milk", "tea", "coffee", "wine", "banana", "bread", "salt", "pepper"};
    final boolean[] messagesState = new boolean[] {false, false, false, false, false, false, false, false};
    final Message<?>[] receivedMessage = new Message<?>[products.length];
    // producer
    new Thread(
      new Runnable() {
        @Override
        public void run() {
          for (int i = 0; i < products.length; i++) {
            Message<Product> msg = constructMessage(products[i]);
            messagesState[i] = rvChannel.send(msg, 2000);
          }
          countDownLatch.countDown();
        }
      }
    ).start();

    // consumer
    new Thread(
      new Runnable() {
        @Override
        public void run() {
          for (int i = 0; i < products.length; i++) {
            receivedMessage[i] = rvChannel.receive(2000);
          }
          countDownLatch.countDown();
        }
      }
    ).start();
    countDownLatch.await();
    for (int i = 0; i < products.length; i++) {
      assertTrue("Message should be correctly received by consumer", messagesState[i]);
      assertEquals("Message was sent in bad order", products[i], ((Product) receivedMessage[i].getPayload()).getName());
    }
  }

  private Message<Product> constructMessage(String name) {
    Product product = new Product();
    product.setName(name);
    return MessageBuilder.withPayload(product).build();
  }
}

This article shows how to implement blocking message channel which can send message only when the previous one was received. In Spring Integration it's RendezvousChannel. In another part we discovered the use of message interceptors. This kind of message hooks allows us to modify message in the flow and, as we could see in one of test cases, change payload's or header's properties.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!