Queue channel

Until now we're working with channels able only to send the messages. This time we'll discover message channel which can both send and receive messages.

At the begin we'll discover this channel from technical point of view. At the second part we'll explore its features as usually, through several test cases.

What is queue channel ?

QueueChannel is an example of pollable channel. This kind of channel stores messages internally. It can simply return first handled message thanks to one of receiving methods: Message receive() or Message receive(long timeout). It's represented by PollableChannel interface. It's implemented by the basic class for most of Spring Integration's pollable channels: QueueChannel and PriorityChannel.

In this article we focus on QueueChannel. By the way, PriorityChannel extends directly QueueChannel and only adds some supplementary features. As its name indicates, QueueChannel stores messages in a queue, protected Message doReceive(long timeout) method. Every message's read causes the removing of element from the queue because of used Queue's take() method.

QueueChannel contains some additional and interesting features. It allows to clean all elements from the queue or only some elements. The removing of some elements is made with public List<Message<?>> purge(MessageSelector selector) method. It takes in parameter an implementation of MessageSelector which acts as message filter and decides if given message is acceptable or not. This detection is made inside accept(Message<?> message) method.

Spring Integration queue channel sample

As usually, we'll start by configuring tested elements. Note that we still use service activator to pass the message to output channel, but this time we add new configuration attribute: send-timeout:

<context:annotation-config />
<context:component-scan base-package="com.waitingforcode"/>

<int:channel id="directChannel" />
<!-- to configure one channel as QueueChannel, we need only to define <queue /> element as child node -->
<int:channel id="queueChannel">
  <int:queue capacity="5" />
</int:channel>


<!-- send-timeout parameter is mandatory here. Otherwise, service activator will block whole application when QueueChannel's queue
      is full -->
<int:service-activator input-channel="directChannel" ref="productActivator" requires-reply="true"
  output-channel="queueChannel" method="handleBookingProduct" send-timeout="1000" />

Let's now take a look on our selector and test cases:

@Component
public class CoffeeSelector implements MessageSelector {
  @Override
  public boolean accept(Message<?> message) {
    if (message.getPayload().getClass() != Product.class) {
      return false;
    }
    Product product = (Product) message.getPayload();
    return "coffee".equalsIgnoreCase(product.getName());
  }
}

@ContextConfiguration(locations = "classpath:META-INF/queue-channel.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class QueueChannelTest {

  @Autowired
  @Qualifier("queueChannel")
  private QueueChannel queueChannel;

  @Autowired
  @Qualifier("directChannel")
  private DirectChannel directChannel;

  @Autowired
  private CoffeeSelector coffeeSelector;

  @Test
  public void testReceiving() {
    // clear QueueChannel queue. Otherwise the sending will fail with MessageDeliveryException
    queueChannel.clear();
    Message<Product> coffeeMsg = constructMessage("coffee");
    directChannel.send(coffeeMsg);

    Message<?> received = queueChannel.receive(2000);
    assertEquals("Payload of received message should be the same as for sent message", received.getPayload(),
      coffeeMsg.getPayload());
  }

  @Test
  public void testQueueCapacity() {
    Message<Product> coffeeMsg = constructMessage("coffee");
    int remaining = queueChannel.getRemainingCapacity();
    boolean wasMde = false;
    for (int i = 0; i <= queueChannel.getQueueSize(); i++) {
      try {
        directChannel.send(coffeeMsg, 1000);
        assertEquals("Queue doesn't decrease correctly", --remaining, queueChannel.getRemainingCapacity());
      } catch (MessageDeliveryException mde) {
        wasMde = true;
      }
    }
    assertTrue("MessageDeliveryException should be thrown when we try to send message to full queue", wasMde);
  }

  @Test
  public void testPurge() {
    queueChannel.clear();
    Message<Product> coffeeMsg = constructMessage("coffee");
    directChannel.send(coffeeMsg, 1000);
    directChannel.send(coffeeMsg, 1000);
    directChannel.send(coffeeMsg, 1000);
    Message<Product> waterMsg = constructMessage("water");
    directChannel.send(waterMsg, 1000);
    Message<Product> cokeMsg = constructMessage("coke");
    directChannel.send(cokeMsg, 1000);

    // Messages weren't consumed by QueueChannel. It's why we'll test purge() method and remove messages not containg
    // coffee Product.
    assertEquals("No place should be available after adding items to the queue", queueChannel.getRemainingCapacity(), 0);
    queueChannel.purge(coffeeSelector);
    assertEquals("After purging 'no-coffee' products, 2 places should be available in the queue",
      2, queueChannel.getRemainingCapacity());
  }

  private Message<Product> constructMessage(String name) {
    Product product = new Product();
    product.setName(name);
    return MessageBuilder.withPayload(product).build();
  }

}

In this article we discovered how to create pollable channel, ie. a channel able to receive messages. We saw that basic pollable channel is QueueChannel. It stores messages in internal BlockingQueue and returns the first element of it on receive method invocation. We also discovered the concept of MessageSelector, a filter-like object which allows to accept or deny a message. It can be use, for example, in QueueChannel's purge method to remove only messages not corresponding to expected criteria.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!