Priority channel and message store

on waitingforcode.com

Priority channel and message store

You're doing Java/C#/JavaScript and doing it great? But you're tired because of always facing the same problems. I was like that 4 years ago. I changed then to the data engineering field and it solved my existential problems :) If you want to follow my path, I prepared a course that will help you with that! Join the class!
From previous articles we could deduce that Spring Integration was a pretty basic enterprise integration framework. However, in this article we'll discover some of its features proving that it hides a lot of advanced features: messages prioritization and messages persistence.

This article will begin by describe message handler chain concept. After that we'll focus on the first from two elements, filters. At the third part we'll talk about header enrichers. All will end by habitual test cases illustrating described concepts.

Message handler chain in Spring Integration

The main purpose of message handler chain consists on simplify configuration. It is able to group together elements used in one message's transaction. So, we can retreive there as well as activators or gateways, as not discovered yet filters or header enrichers. To see this simplification of configuration, take a look on below sample configuration:

<int:chain id="senderReceiverSample" input-channel="sender" output-channel="receiver">
  <int:header-enricher>
      <int:header name="enriched" value="xxx"/>
  </int:header-enricher>
  <int:filter ref="coffeeFilter" throw-exception-on-rejection="true" discard-channel="trash" />
  <int:service-activator ref="productActivator" requires-reply="true"
      method="handleBookingProduct"/>
</int:chain>

As you can see, the reading of that is more simpler than the reading of separetely defined elements.

Technically, message handler chain is an instance of org.springframework.integration.handler.MessageHandlerChain which itself implements MessageHandler interface by extending AbstractMessageHandler class. All elements defined inside <chain /> element are in fact the implementations of MessageHandler too. They're stored in MessageHandlerChain's private List<MessageHandler> field. Expect the last element, the n element is associated with n+1 element from the list by anonymous message channel. In practice it means that message treated by channel n will be send further to n+1 channel. This is MessageHandlerChain's code snippet charging for doing this message transfer:

final MessageHandler nextHandler = handlers.get(i + 1);
final MessageChannel nextChannel = new MessageChannel() {
  @Override
  public boolean send(Message message, long timeout) {
    return this.send(message);
  }
  @Override
  public boolean send(Message message) {
    nextHandler.handleMessage(message);
    return true;
  }
};
((MessageProducer) handler).setOutputChannel(nextChannel);

The last line of this snippet is very important. It's linked with the constraint for the elements defined inside <chain />. In fact, they must implement MessageProducer interface. In additionnaly, one check on that is made inside the code generating anonymous message channels:

Assert.isTrue(handler instanceof MessageProducer, "All handlers except for " +
  "the last one in the chain must implement the MessageProducer interface.");

However, if given chain defines an output-channel, last element also must implement MessageProducer interface.

Unlike configuration simplification, another interesting feature of message chain is the possibility to joining chains. For example, we can imagine one chain making some filtering and sending only valid messages to another chain. The another chain, after making some changes on received message, can return modified message to original sender. It's achieved with org.springframework.integration.gateway.RequestReplyMessageHandlerAdapter. This object contains an implementation of RequestReplyExchanger allowing messages exchanging.

Filters in Spring Integration

Filters in Spring Integration don't lost theirs meaning from the real world. They also check if one element corresponds to expected criteria. Filters in Spring Integration help to create an entreprise application integration pattern called selective consumer. As we can read in EAIP's page, this pattern represents an application consuming not necessarily all channel's messages, but only messages corresponding to the expectations.

In Spring Integration message filter is represented by org.springframework.integration.filter.MessageFilter class. To decide if message is accepted by the channel, the filter is helped by an implementation of org.springframework.integration.core.MessageSelector. A MessageSelector is an interface containing only one method: public boolean accept(Message<?> message). This method analyzes Message instance passed in parameter and returns true if message should be accepted. The decision operation is made in this MessageFilter's method:

@Override
protected Object doHandleRequestMessage(Message<?> message) {
  if (this.selector.accept(message)) {
    return message;
  }
  else {
    return null;
  }
}

Refused messages can be simply rejected, provoking delivery exceptions, or sent directly to a kind of "trash" message channel. Internally this channel is called discard message channel and is represented by private volatile MessageChannel discardChannel field.

Header enrichers in Spring Integration

The last element discovered in this article which can be defined inside <chain /> is header enricher. As its name indicates, is a kind of object which modify header values. Technically it can be a bean adding configured fixed value to message's header or dynamically generated value by enriching method. Static values are added by HeaderEnricher instance.

Because message's headers are immutable, header enricher had to get already existent headers as a mutable HashMap instance and after that add new entries defined in <header-enricher /> element. At the end of this process a new message is returned, containing payload of received message and headers enriched by previously described operation. From the code side the operation looks like:

Map<String, Object> headerMap = new HashMap<String, Object>(message.getHeaders());
this.addHeadersFromMessageProcessor(message, headerMap);
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
  String key = entry.getKey();
  HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();

  Boolean shouldOverwrite = valueProcessor.isOverwrite();
  if (shouldOverwrite == null) {
    shouldOverwrite = this.defaultOverwrite;
  }

  boolean headerDoesNotExist = headerMap.get(key) == null;

  /**
   * Only evaluate value expression if necessary
   */
  if (headerDoesNotExist || shouldOverwrite) {
    Object value = valueProcessor.processMessage(message);
    if (value != null || !this.shouldSkipNulls) {
      headerMap.put(key, value);
    }
  }
}
return this.getMessageBuilderFactory().withPayload(message.getPayload()).copyHeaders(headerMap).build();

Example of message handler chain in Spring Integration

We'll see how does message handler chain work in the example of message channel accepting only coffee messages. As you can deduce, this sample'll also contain MessageSelector and HeaderEnricher to prove clearly call order of nested channels. Let's begin by used configuration:

<context:annotation-config />
<context:component-scan base-package="com.waitingforcode"/>

<int:channel id="sender" />
<int:channel id="receiver">
  <int:queue capacity="10" />
</int:channel>
<int:channel id="trash">
  <int:queue capacity="10" />
</int:channel>
<int:channel id="senderForNested" />
<int:channel id="receiverForNested">
  <int:queue capacity="10" />
</int:channel>
<int:channel id="senderForNested1" />
<int:channel id="senderForNested2" />


<int:chain id="senderReceiverSample" input-channel="sender" output-channel="receiver">
  <int:header-enricher>
    <int:header name="enriched" value="xxx"/>
  </int:header-enricher>
  <int:filter ref="coffeeFilter" throw-exception-on-rejection="true" discard-channel="trash" />
  <int:service-activator ref="productActivator" requires-reply="true"
    method="handleBookingProduct"/>
</int:chain>

<!-- Sample configuration for nested chain call with gateway element. -->
<int:chain id="mainSenderReceiver" input-channel="senderForNested" output-channel="receiverForNested">
  <int:header-enricher>
    <int:header name="main-channel" ref="timeEnricher" method="appendAccessTime" />
  </int:header-enricher>
  <int:filter ref="coffeeFilter" />
  <int:gateway request-channel="senderForNested1" />
</int:chain>
<int:chain id="nested1SenderReceiver" input-channel="senderForNested1">
  <int:header-enricher>
    <int:header name="nested-1" ref="timeEnricher" method="appendAccessTime" />
  </int:header-enricher>
  <int:gateway request-channel="senderForNested2"  />
</int:chain>
<int:chain id="nested2SenderReceiver" input-channel="senderForNested2">
  <int:header-enricher>
    <int:header name="nested-2" ref="timeEnricher" method="appendAccessTime"/>
  </int:header-enricher>
</int:chain>

<bean id="coffeeFilter" class="org.springframework.integration.filter.MessageFilter" scope="prototype">
    <constructor-arg ref="coffeeSelector" />
</bean>

Header enricher and coffee selector used in test case are:

/**
 * Header enricher appending current miliseconds time to message header.
 *
 * @author Bartosz Konieczny
 */
@Component
public class TimeEnricher {

  public String appendAccessTime() throws InterruptedException {
    Thread.sleep(500);
    return ""+System.currentTimeMillis();
  }

}

/**
 * Sample selector used to accept only messages containing "coffee" product.
 *
 * @author Bartosz Konieczny
 */
@Component
public class CoffeeSelector implements MessageSelector {
  @Override
  public boolean accept(Message<?> message) {
    if (message.getPayload().getClass() != Product.class) {
      return false;
    }
    Product product = (Product) message.getPayload();
    return "coffee".equalsIgnoreCase(product.getName());
  }
}

Everything assembled in JUnit test case looks like:

/**
 * Some test cases to illustrate message handler chain and some of its features as filters and
 * header enrichers.
 *
 * @author Bartosz Konieczny
 */
@ContextConfiguration(locations = "classpath:META-INF/message-chain.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageChainTest {

  @Autowired
  @Qualifier("sender")
  private DirectChannel sender;

  @Autowired
  @Qualifier("receiver")
  private QueueChannel receiver;

  @Autowired
  @Qualifier("trash")
  private QueueChannel trash;

  @Autowired
  @Qualifier("senderForNested")
  private DirectChannel senderForNested;

  @Autowired
  @Qualifier("receiverForNested")
  private QueueChannel receiverForNested;

  @Test
  public void testSimpleSend() {
    Product coffee = constructProduct("coffee");
    Message<Product> coffeeMsg = MessageBuilder.withPayload(coffee).build();
    sender.send(coffeeMsg);

    Message<?> receivedMsg = receiver.receive(3000);
    assertEquals("Received message should be equal to coffee message", receivedMsg.getPayload(), coffeeMsg.getPayload());

    String enrichedValue = (String) receivedMsg.getHeaders().get("enriched");
    assertEquals("Received message should contain 'xxx' enriched header", "xxx", enrichedValue);
  }

  @Test
  public void testCoffeeFilter() throws InterruptedException {
    Product milk = constructProduct("milk");
    Message<Product> milkMsg = MessageBuilder.withPayload(milk).build();
    boolean wasMre = false;
    try {
      sender.send(milkMsg);

    } catch (MessageRejectedException mre) {
      wasMre = true;
    }
    Message<?> receivedMsg = receiver.receive(3000);
    assertNull("Received message should be null because of CoffeeFilter", receivedMsg);
    assertTrue("MessageRejectedException should be thrown on message rejecting", wasMre);

    receivedMsg = trash.receive(4000);
    assertEquals("Discard channel should correctly handle rejected 'milk' message", milk, receivedMsg.getPayload());
  }

  @Test
  public void testNested() {
    Product coffee = constructProduct("coffee");
    Message<Product> milkMsg = MessageBuilder.withPayload(coffee).build();
    senderForNested.send(milkMsg, 3000);
    Message<?> receivedMsg = receiverForNested.receive(3000);
    // Expected order of gateways are: main channel, gateway defined in main channel, gateway defined in previous gateway
    // So values of corresponding headers (respectively main-channel, nested-1 and nested-2) should have values in ascending
    // order
    long mainChannel = Long.valueOf((String)receivedMsg.getHeaders().get("main-channel")).longValue();
    long nested1 = Long.valueOf((String)receivedMsg.getHeaders().get("nested-1")).longValue();
    long nested2 = Long.valueOf((String)receivedMsg.getHeaders().get("nested-2")).longValue();
    assertTrue("Main channel should be called before the 1st gateway",nested1 > mainChannel);
    assertTrue("The 1st gateway should be called before the 2nd gateway", nested2 > nested1);
    assertEquals("Bad payload object was send through gateways", coffee, receivedMsg.getPayload());
  }

  private Product constructProduct(String name) {
    Product product = new Product();
    product.setName(name);
    return product;
  }
}

In this article we discovered the way to clarify XML configuration thanks to message handler chaining. We also learned how to jump from one chain to another with gateways. After we also saw that messages can be freely manipulated before arriving to output channel. In this text we saw header modyfication through header enricher and messages filtering with MessageSelector instance.

If you liked it, you should read: Splitters and aggregators Routing Transformers

Share on: